datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::kv_backend::KvBackendRef;
22use common_telemetry::error;
23use snafu::OptionExt;
24use store_api::storage::GcReport;
25
26mod apply_staging_manifest;
27mod close_region;
28mod downgrade_region;
29mod enter_staging;
30mod file_ref;
31mod flush_region;
32mod gc_worker;
33mod open_region;
34mod remap_manifest;
35mod sync_region;
36mod upgrade_region;
37
38use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
39use crate::heartbeat::handler::close_region::CloseRegionsHandler;
40use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
41use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
42use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
43use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
44use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
45use crate::heartbeat::handler::open_region::OpenRegionsHandler;
46use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
47use crate::heartbeat::handler::sync_region::SyncRegionHandler;
48use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
49use crate::heartbeat::task_tracker::TaskTracker;
50use crate::region_server::RegionServer;
51
52/// The handler for [`Instruction`]s.
53#[derive(Clone)]
54pub struct RegionHeartbeatResponseHandler {
55    region_server: RegionServer,
56    downgrade_tasks: TaskTracker<()>,
57    flush_tasks: TaskTracker<()>,
58    open_region_parallelism: usize,
59    gc_tasks: TaskTracker<GcReport>,
60    kv_backend: KvBackendRef,
61}
62
63#[async_trait::async_trait]
64pub trait InstructionHandler: Send + Sync {
65    type Instruction;
66    async fn handle(
67        &self,
68        ctx: &HandlerContext,
69        instruction: Self::Instruction,
70    ) -> Option<InstructionReply>;
71}
72
73#[derive(Clone)]
74pub struct HandlerContext {
75    pub region_server: RegionServer,
76    pub downgrade_tasks: TaskTracker<()>,
77    pub flush_tasks: TaskTracker<()>,
78    pub gc_tasks: TaskTracker<GcReport>,
79    pub kv_backend: KvBackendRef,
80}
81
82impl HandlerContext {
83    #[cfg(test)]
84    pub fn new_for_test(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
85        Self {
86            region_server,
87            downgrade_tasks: TaskTracker::new(),
88            flush_tasks: TaskTracker::new(),
89            gc_tasks: TaskTracker::new(),
90            kv_backend,
91        }
92    }
93}
94
95impl RegionHeartbeatResponseHandler {
96    /// Returns the [RegionHeartbeatResponseHandler].
97    pub fn new(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
98        Self {
99            region_server,
100            downgrade_tasks: TaskTracker::new(),
101            flush_tasks: TaskTracker::new(),
102            // Default to half of the number of CPUs.
103            open_region_parallelism: (num_cpus::get() / 2).max(1),
104            gc_tasks: TaskTracker::new(),
105            kv_backend,
106        }
107    }
108
109    /// Sets the parallelism for opening regions.
110    pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
111        self.open_region_parallelism = parallelism;
112        self
113    }
114
115    fn build_handler(
116        &self,
117        instruction: &Instruction,
118    ) -> MetaResult<Option<Box<InstructionHandlers>>> {
119        match instruction {
120            Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
121            Instruction::OpenRegions(_) => Ok(Some(Box::new(
122                OpenRegionsHandler {
123                    open_region_parallelism: self.open_region_parallelism,
124                }
125                .into(),
126            ))),
127            Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
128            Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
129            Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
130                UpgradeRegionsHandler {
131                    upgrade_region_parallelism: self.open_region_parallelism,
132                }
133                .into(),
134            ))),
135            Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
136            Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
137            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
138            Instruction::Suspend => Ok(None),
139            Instruction::EnterStagingRegions(_) => {
140                Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
141            }
142            Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
143            Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
144            Instruction::ApplyStagingManifests(_) => {
145                Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
146            }
147        }
148    }
149}
150
151#[allow(clippy::enum_variant_names)]
152pub enum InstructionHandlers {
153    CloseRegions(CloseRegionsHandler),
154    OpenRegions(OpenRegionsHandler),
155    FlushRegions(FlushRegionsHandler),
156    DowngradeRegions(DowngradeRegionsHandler),
157    UpgradeRegions(UpgradeRegionsHandler),
158    GetFileRefs(GetFileRefsHandler),
159    GcRegions(GcRegionsHandler),
160    EnterStagingRegions(EnterStagingRegionsHandler),
161    SyncRegions(SyncRegionHandler),
162    RemapManifest(RemapManifestHandler),
163    ApplyStagingManifests(ApplyStagingManifestsHandler),
164}
165
166macro_rules! impl_from_handler {
167    ($($handler:ident => $variant:ident),*) => {
168        $(
169            impl From<$handler> for InstructionHandlers {
170                fn from(handler: $handler) -> Self {
171                    InstructionHandlers::$variant(handler)
172                }
173            }
174        )*
175    };
176}
177
178impl_from_handler!(
179    CloseRegionsHandler => CloseRegions,
180    OpenRegionsHandler => OpenRegions,
181    FlushRegionsHandler => FlushRegions,
182    DowngradeRegionsHandler => DowngradeRegions,
183    UpgradeRegionsHandler => UpgradeRegions,
184    GetFileRefsHandler => GetFileRefs,
185    GcRegionsHandler => GcRegions,
186    EnterStagingRegionsHandler => EnterStagingRegions,
187    SyncRegionHandler => SyncRegions,
188    RemapManifestHandler => RemapManifest,
189    ApplyStagingManifestsHandler => ApplyStagingManifests
190);
191
192macro_rules! dispatch_instr {
193    (
194        $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
195    ) => {
196        impl InstructionHandlers {
197            pub async fn handle(
198                &self,
199                ctx: &HandlerContext,
200                instruction: Instruction,
201            ) -> Option<InstructionReply> {
202                match (self, instruction) {
203                    $(
204                        (
205                            InstructionHandlers::$handler_variant(handler),
206                            Instruction::$instr_variant(instr),
207                        ) => handler.handle(ctx, instr).await,
208                    )*
209                    // Safety: must be used in pairs with `build_handler`.
210                    _ => unreachable!(),
211                }
212            }
213            /// Check whether this instruction is acceptable by any handler.
214            pub fn is_acceptable(instruction: &Instruction) -> bool {
215                matches!(
216                    instruction,
217                    $(
218                        Instruction::$instr_variant { .. }
219                    )|*
220                )
221            }
222        }
223    };
224}
225
226dispatch_instr!(
227    CloseRegions => CloseRegions,
228    OpenRegions => OpenRegions,
229    FlushRegions => FlushRegions,
230    DowngradeRegions => DowngradeRegions,
231    UpgradeRegions => UpgradeRegions,
232    GetFileRefs => GetFileRefs,
233    GcRegions => GcRegions,
234    EnterStagingRegions => EnterStagingRegions,
235    SyncRegions => SyncRegions,
236    RemapManifest => RemapManifest,
237    ApplyStagingManifests => ApplyStagingManifests,
238);
239
240#[async_trait]
241impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
242    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
243        if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
244            return InstructionHandlers::is_acceptable(instruction);
245        }
246        false
247    }
248
249    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
250        let (meta, instruction) = ctx
251            .incoming_message
252            .take()
253            .context(InvalidHeartbeatResponseSnafu)?;
254
255        let mailbox = ctx.mailbox.clone();
256        if let Some(handler) = self.build_handler(&instruction)? {
257            let context = HandlerContext {
258                region_server: self.region_server.clone(),
259                downgrade_tasks: self.downgrade_tasks.clone(),
260                flush_tasks: self.flush_tasks.clone(),
261                gc_tasks: self.gc_tasks.clone(),
262                kv_backend: self.kv_backend.clone(),
263            };
264            let _handle = common_runtime::spawn_global(async move {
265                let reply = handler.handle(&context, instruction).await;
266                if let Some(reply) = reply
267                    && let Err(e) = mailbox.send((meta, reply)).await
268                {
269                    let error = e.to_string();
270                    let (meta, reply) = e.0;
271                    error!("Failed to send reply {reply} to {meta:?}: {error}");
272                }
273            });
274        }
275
276        Ok(HandleControl::Continue)
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use std::assert_matches::assert_matches;
283    use std::collections::HashMap;
284    use std::sync::Arc;
285    use std::time::Duration;
286
287    use common_meta::RegionIdent;
288    use common_meta::heartbeat::mailbox::{
289        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
290    };
291    use common_meta::instruction::{
292        DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
293    };
294    use common_meta::kv_backend::memory::MemoryKvBackend;
295    use mito2::config::MitoConfig;
296    use mito2::engine::MITO_ENGINE_NAME;
297    use mito2::test_util::{CreateRequestBuilder, TestEnv};
298    use store_api::path_utils::table_dir;
299    use store_api::region_engine::RegionRole;
300    use store_api::region_request::{RegionCloseRequest, RegionRequest};
301    use store_api::storage::RegionId;
302    use tokio::sync::mpsc::{self, Receiver};
303
304    use super::*;
305    use crate::error;
306    use crate::tests::mock_region_server;
307
308    pub struct HeartbeatResponseTestEnv {
309        pub(crate) mailbox: MailboxRef,
310        pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
311    }
312
313    impl HeartbeatResponseTestEnv {
314        pub fn new() -> Self {
315            let (tx, rx) = mpsc::channel(8);
316            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
317
318            HeartbeatResponseTestEnv {
319                mailbox,
320                receiver: rx,
321            }
322        }
323
324        pub fn create_handler_ctx(
325            &self,
326            incoming_message: IncomingMessage,
327        ) -> HeartbeatResponseHandlerContext {
328            HeartbeatResponseHandlerContext {
329                mailbox: self.mailbox.clone(),
330                response: Default::default(),
331                incoming_message: Some(incoming_message),
332            }
333        }
334    }
335
336    #[test]
337    fn test_is_acceptable() {
338        common_telemetry::init_default_ut_logging();
339        let region_server = mock_region_server();
340        let kv_backend = Arc::new(MemoryKvBackend::new());
341        let heartbeat_handler =
342            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
343        let heartbeat_env = HeartbeatResponseTestEnv::new();
344        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
345
346        // Open region
347        let region_id = RegionId::new(1024, 1);
348        let storage_path = "test";
349        let instruction = open_region_instruction(region_id, storage_path);
350        assert!(
351            heartbeat_handler
352                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
353        );
354
355        // Close region
356        let instruction = close_region_instruction(region_id);
357        assert!(
358            heartbeat_handler
359                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
360        );
361
362        // Downgrade region
363        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
364            region_id: RegionId::new(2048, 1),
365            flush_timeout: Some(Duration::from_secs(1)),
366        }]);
367        assert!(
368            heartbeat_handler
369                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
370        );
371
372        // Upgrade region
373        let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
374            region_id,
375            ..Default::default()
376        }]);
377        assert!(
378            heartbeat_handler
379                .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
380        );
381
382        // Enter staging region
383        let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
384            region_id,
385            partition_expr: "".to_string(),
386        }]);
387        assert!(
388            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
389        );
390    }
391
392    fn close_region_instruction(region_id: RegionId) -> Instruction {
393        Instruction::CloseRegions(vec![RegionIdent {
394            table_id: region_id.table_id(),
395            region_number: region_id.region_number(),
396            datanode_id: 2,
397            engine: MITO_ENGINE_NAME.to_string(),
398        }])
399    }
400
401    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
402        Instruction::OpenRegions(vec![OpenRegion::new(
403            RegionIdent {
404                table_id: region_id.table_id(),
405                region_number: region_id.region_number(),
406                datanode_id: 2,
407                engine: MITO_ENGINE_NAME.to_string(),
408            },
409            path,
410            HashMap::new(),
411            HashMap::new(),
412            false,
413        )])
414    }
415
416    #[tokio::test]
417    async fn test_close_region() {
418        common_telemetry::init_default_ut_logging();
419
420        let mut region_server = mock_region_server();
421        let kv_backend = Arc::new(MemoryKvBackend::new());
422        let heartbeat_handler =
423            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
424
425        let mut engine_env = TestEnv::with_prefix("close-region").await;
426        let engine = engine_env.create_engine(MitoConfig::default()).await;
427        region_server.register_engine(Arc::new(engine));
428        let region_id = RegionId::new(1024, 1);
429
430        let builder = CreateRequestBuilder::new();
431        let create_req = builder.build();
432        region_server
433            .handle_request(region_id, RegionRequest::Create(create_req))
434            .await
435            .unwrap();
436
437        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
438
439        // Should be ok, if we try to close it twice.
440        for _ in 0..2 {
441            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
442            let instruction = close_region_instruction(region_id);
443
444            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
445            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
446            assert_matches!(control, HandleControl::Continue);
447
448            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
449
450            if let InstructionReply::CloseRegions(reply) = reply {
451                assert!(reply.result);
452                assert!(reply.error.is_none());
453            } else {
454                unreachable!()
455            }
456
457            assert_matches!(
458                region_server
459                    .set_region_role(region_id, RegionRole::Leader)
460                    .unwrap_err(),
461                error::Error::RegionNotFound { .. }
462            );
463        }
464    }
465
466    #[tokio::test]
467    async fn test_open_region_ok() {
468        common_telemetry::init_default_ut_logging();
469
470        let mut region_server = mock_region_server();
471        let kv_backend = Arc::new(MemoryKvBackend::new());
472        let heartbeat_handler =
473            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
474
475        let mut engine_env = TestEnv::with_prefix("open-region").await;
476        let engine = engine_env.create_engine(MitoConfig::default()).await;
477        region_server.register_engine(Arc::new(engine));
478        let region_id = RegionId::new(1024, 1);
479
480        let builder = CreateRequestBuilder::new();
481        let mut create_req = builder.build();
482        let storage_path = "test";
483        create_req.table_dir = table_dir(storage_path, region_id.table_id());
484
485        region_server
486            .handle_request(region_id, RegionRequest::Create(create_req))
487            .await
488            .unwrap();
489
490        region_server
491            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
492            .await
493            .unwrap();
494        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
495
496        // Should be ok, if we try to open it twice.
497        for _ in 0..2 {
498            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
499            let instruction = open_region_instruction(region_id, storage_path);
500
501            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
502            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
503            assert_matches!(control, HandleControl::Continue);
504
505            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
506
507            if let InstructionReply::OpenRegions(reply) = reply {
508                assert!(reply.result);
509                assert!(reply.error.is_none());
510            } else {
511                unreachable!()
512            }
513        }
514    }
515
516    #[tokio::test]
517    async fn test_open_not_exists_region() {
518        common_telemetry::init_default_ut_logging();
519
520        let mut region_server = mock_region_server();
521        let kv_backend = Arc::new(MemoryKvBackend::new());
522        let heartbeat_handler =
523            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
524
525        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
526        let engine = engine_env.create_engine(MitoConfig::default()).await;
527        region_server.register_engine(Arc::new(engine));
528        let region_id = RegionId::new(1024, 1);
529        let storage_path = "test";
530
531        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
532
533        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
534        let instruction = open_region_instruction(region_id, storage_path);
535
536        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
537        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
538        assert_matches!(control, HandleControl::Continue);
539
540        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
541
542        if let InstructionReply::OpenRegions(reply) = reply {
543            assert!(!reply.result);
544            assert!(reply.error.is_some());
545        } else {
546            unreachable!()
547        }
548    }
549
550    #[tokio::test]
551    async fn test_downgrade_region() {
552        common_telemetry::init_default_ut_logging();
553
554        let mut region_server = mock_region_server();
555        let kv_backend = Arc::new(MemoryKvBackend::new());
556        let heartbeat_handler =
557            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
558
559        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
560        let engine = engine_env.create_engine(MitoConfig::default()).await;
561        region_server.register_engine(Arc::new(engine));
562        let region_id = RegionId::new(1024, 1);
563
564        let builder = CreateRequestBuilder::new();
565        let mut create_req = builder.build();
566        let storage_path = "test";
567        create_req.table_dir = table_dir(storage_path, region_id.table_id());
568
569        region_server
570            .handle_request(region_id, RegionRequest::Create(create_req))
571            .await
572            .unwrap();
573
574        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
575
576        // Should be ok, if we try to downgrade it twice.
577        for _ in 0..2 {
578            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
579            let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
580                region_id,
581                flush_timeout: Some(Duration::from_secs(1)),
582            }]);
583
584            let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
585            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
586            assert_matches!(control, HandleControl::Continue);
587
588            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
589
590            let reply = &reply.expect_downgrade_regions_reply()[0];
591            assert!(reply.exists);
592            assert!(reply.error.is_none());
593            assert_eq!(reply.last_entry_id.unwrap(), 0);
594        }
595
596        // Downgrades a not exists region.
597        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
598        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
599            region_id: RegionId::new(2048, 1),
600            flush_timeout: Some(Duration::from_secs(1)),
601        }]);
602        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
603        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
604        assert_matches!(control, HandleControl::Continue);
605
606        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
607
608        let reply = reply.expect_downgrade_regions_reply();
609        assert!(!reply[0].exists);
610        assert!(reply[0].error.is_none());
611        assert!(reply[0].last_entry_id.is_none());
612    }
613}