datanode/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18    HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::kv_backend::KvBackendRef;
22use common_telemetry::error;
23use common_telemetry::tracing_context::FutureExt;
24use snafu::OptionExt;
25use store_api::storage::GcReport;
26use strum::AsRefStr;
27
28mod apply_staging_manifest;
29mod close_region;
30mod downgrade_region;
31mod enter_staging;
32mod file_ref;
33mod flush_region;
34mod gc_worker;
35mod open_region;
36mod remap_manifest;
37mod sync_region;
38mod upgrade_region;
39
40use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
41use crate::heartbeat::handler::close_region::CloseRegionsHandler;
42use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
43use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
44use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
45use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
46use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
47use crate::heartbeat::handler::open_region::OpenRegionsHandler;
48use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
49use crate::heartbeat::handler::sync_region::SyncRegionHandler;
50use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
51use crate::heartbeat::task_tracker::TaskTracker;
52use crate::region_server::RegionServer;
53
54/// The handler for [`Instruction`]s.
55#[derive(Clone)]
56pub struct RegionHeartbeatResponseHandler {
57    region_server: RegionServer,
58    downgrade_tasks: TaskTracker<()>,
59    flush_tasks: TaskTracker<()>,
60    open_region_parallelism: usize,
61    gc_tasks: TaskTracker<GcReport>,
62    kv_backend: KvBackendRef,
63}
64
65#[async_trait::async_trait]
66pub trait InstructionHandler: Send + Sync {
67    type Instruction;
68    async fn handle(
69        &self,
70        ctx: &HandlerContext,
71        instruction: Self::Instruction,
72    ) -> Option<InstructionReply>;
73}
74
75#[derive(Clone)]
76pub struct HandlerContext {
77    pub region_server: RegionServer,
78    pub downgrade_tasks: TaskTracker<()>,
79    pub flush_tasks: TaskTracker<()>,
80    pub gc_tasks: TaskTracker<GcReport>,
81    pub kv_backend: KvBackendRef,
82}
83
84impl HandlerContext {
85    #[cfg(test)]
86    pub fn new_for_test(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
87        Self {
88            region_server,
89            downgrade_tasks: TaskTracker::new(),
90            flush_tasks: TaskTracker::new(),
91            gc_tasks: TaskTracker::new(),
92            kv_backend,
93        }
94    }
95}
96
97impl RegionHeartbeatResponseHandler {
98    /// Returns the [RegionHeartbeatResponseHandler].
99    pub fn new(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
100        Self {
101            region_server,
102            downgrade_tasks: TaskTracker::new(),
103            flush_tasks: TaskTracker::new(),
104            // Default to half of the number of CPUs.
105            open_region_parallelism: (num_cpus::get() / 2).max(1),
106            gc_tasks: TaskTracker::new(),
107            kv_backend,
108        }
109    }
110
111    /// Sets the parallelism for opening regions.
112    pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
113        self.open_region_parallelism = parallelism;
114        self
115    }
116
117    fn build_handler(
118        &self,
119        instruction: &Instruction,
120    ) -> MetaResult<Option<Box<InstructionHandlers>>> {
121        match instruction {
122            Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
123            Instruction::OpenRegions(_) => Ok(Some(Box::new(
124                OpenRegionsHandler {
125                    open_region_parallelism: self.open_region_parallelism,
126                }
127                .into(),
128            ))),
129            Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
130            Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
131            Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
132                UpgradeRegionsHandler {
133                    upgrade_region_parallelism: self.open_region_parallelism,
134                }
135                .into(),
136            ))),
137            Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
138            Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
139            Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
140            Instruction::Suspend => Ok(None),
141            Instruction::EnterStagingRegions(_) => {
142                Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
143            }
144            Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
145            Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
146            Instruction::ApplyStagingManifests(_) => {
147                Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
148            }
149        }
150    }
151}
152
153#[allow(clippy::enum_variant_names)]
154#[derive(AsRefStr)]
155pub enum InstructionHandlers {
156    CloseRegions(CloseRegionsHandler),
157    OpenRegions(OpenRegionsHandler),
158    FlushRegions(FlushRegionsHandler),
159    DowngradeRegions(DowngradeRegionsHandler),
160    UpgradeRegions(UpgradeRegionsHandler),
161    GetFileRefs(GetFileRefsHandler),
162    GcRegions(GcRegionsHandler),
163    EnterStagingRegions(EnterStagingRegionsHandler),
164    SyncRegions(SyncRegionHandler),
165    RemapManifest(RemapManifestHandler),
166    ApplyStagingManifests(ApplyStagingManifestsHandler),
167}
168
169impl InstructionHandlers {
170    // Returns the string representation of the instruction handler.
171    pub fn as_ref_str(&self) -> &str {
172        self.as_ref()
173    }
174}
175
176macro_rules! impl_from_handler {
177    ($($handler:ident => $variant:ident),*) => {
178        $(
179            impl From<$handler> for InstructionHandlers {
180                fn from(handler: $handler) -> Self {
181                    InstructionHandlers::$variant(handler)
182                }
183            }
184        )*
185    };
186}
187
188impl_from_handler!(
189    CloseRegionsHandler => CloseRegions,
190    OpenRegionsHandler => OpenRegions,
191    FlushRegionsHandler => FlushRegions,
192    DowngradeRegionsHandler => DowngradeRegions,
193    UpgradeRegionsHandler => UpgradeRegions,
194    GetFileRefsHandler => GetFileRefs,
195    GcRegionsHandler => GcRegions,
196    EnterStagingRegionsHandler => EnterStagingRegions,
197    SyncRegionHandler => SyncRegions,
198    RemapManifestHandler => RemapManifest,
199    ApplyStagingManifestsHandler => ApplyStagingManifests
200);
201
202macro_rules! dispatch_instr {
203    (
204        $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
205    ) => {
206        impl InstructionHandlers {
207            pub async fn handle(
208                &self,
209                ctx: &HandlerContext,
210                instruction: Instruction,
211            ) -> Option<InstructionReply> {
212                match (self, instruction) {
213                    $(
214                        (
215                            InstructionHandlers::$handler_variant(handler),
216                            Instruction::$instr_variant(instr),
217                        ) => handler.handle(ctx, instr).await,
218                    )*
219                    // Safety: must be used in pairs with `build_handler`.
220                    _ => unreachable!(),
221                }
222            }
223            /// Check whether this instruction is acceptable by any handler.
224            pub fn is_acceptable(instruction: &Instruction) -> bool {
225                matches!(
226                    instruction,
227                    $(
228                        Instruction::$instr_variant { .. }
229                    )|*
230                )
231            }
232        }
233    };
234}
235
236dispatch_instr!(
237    CloseRegions => CloseRegions,
238    OpenRegions => OpenRegions,
239    FlushRegions => FlushRegions,
240    DowngradeRegions => DowngradeRegions,
241    UpgradeRegions => UpgradeRegions,
242    GetFileRefs => GetFileRefs,
243    GcRegions => GcRegions,
244    EnterStagingRegions => EnterStagingRegions,
245    SyncRegions => SyncRegions,
246    RemapManifest => RemapManifest,
247    ApplyStagingManifests => ApplyStagingManifests,
248);
249
250#[async_trait]
251impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
252    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
253        if let Some((_, _, instruction)) = ctx.incoming_message.as_ref() {
254            return InstructionHandlers::is_acceptable(instruction);
255        }
256        false
257    }
258
259    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
260        let (meta, tracing_ctx, instruction) = ctx
261            .incoming_message
262            .take()
263            .context(InvalidHeartbeatResponseSnafu)?;
264
265        let mailbox = ctx.mailbox.clone();
266        if let Some(handler) = self.build_handler(&instruction)? {
267            let context = HandlerContext {
268                region_server: self.region_server.clone(),
269                downgrade_tasks: self.downgrade_tasks.clone(),
270                flush_tasks: self.flush_tasks.clone(),
271                gc_tasks: self.gc_tasks.clone(),
272                kv_backend: self.kv_backend.clone(),
273            };
274            let span = tracing_ctx.attach(tracing::info_span!(
275                "RegionHeartbeatResponseHandler::handle",
276                from = %meta.from,
277                to = %meta.to,
278                handler = %handler.as_ref_str(),
279            ));
280            let _handle = common_runtime::spawn_global(async move {
281                let reply = handler.handle(&context, instruction).trace(span).await;
282                if let Some(reply) = reply
283                    && let Err(e) = mailbox.send((meta, reply)).await
284                {
285                    let error = e.to_string();
286                    let (meta, reply) = e.0;
287                    error!("Failed to send reply {reply} to {meta:?}: {error}");
288                }
289            });
290        }
291
292        Ok(HandleControl::Continue)
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use std::assert_matches::assert_matches;
299    use std::collections::HashMap;
300    use std::sync::Arc;
301    use std::time::Duration;
302
303    use common_meta::RegionIdent;
304    use common_meta::heartbeat::mailbox::{
305        HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
306    };
307    use common_meta::instruction::{
308        DowngradeRegion, EnterStagingRegion, OpenRegion, StagingPartitionDirective, UpgradeRegion,
309    };
310    use common_meta::kv_backend::memory::MemoryKvBackend;
311    use mito2::config::MitoConfig;
312    use mito2::engine::MITO_ENGINE_NAME;
313    use mito2::test_util::{CreateRequestBuilder, TestEnv};
314    use store_api::path_utils::table_dir;
315    use store_api::region_engine::RegionRole;
316    use store_api::region_request::{RegionCloseRequest, RegionRequest};
317    use store_api::storage::RegionId;
318    use tokio::sync::mpsc::{self, Receiver};
319
320    use super::*;
321    use crate::error;
322    use crate::tests::mock_region_server;
323
324    pub struct HeartbeatResponseTestEnv {
325        pub(crate) mailbox: MailboxRef,
326        pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
327    }
328
329    impl HeartbeatResponseTestEnv {
330        pub fn new() -> Self {
331            let (tx, rx) = mpsc::channel(8);
332            let mailbox = Arc::new(HeartbeatMailbox::new(tx));
333
334            HeartbeatResponseTestEnv {
335                mailbox,
336                receiver: rx,
337            }
338        }
339
340        pub fn create_handler_ctx(
341            &self,
342            incoming_message: IncomingMessage,
343        ) -> HeartbeatResponseHandlerContext {
344            HeartbeatResponseHandlerContext {
345                mailbox: self.mailbox.clone(),
346                response: Default::default(),
347                incoming_message: Some(incoming_message),
348            }
349        }
350    }
351
352    #[test]
353    fn test_is_acceptable() {
354        common_telemetry::init_default_ut_logging();
355        let region_server = mock_region_server();
356        let kv_backend = Arc::new(MemoryKvBackend::new());
357        let heartbeat_handler =
358            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
359        let heartbeat_env = HeartbeatResponseTestEnv::new();
360        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
361
362        // Open region
363        let region_id = RegionId::new(1024, 1);
364        let storage_path = "test";
365        let instruction = open_region_instruction(region_id, storage_path);
366        assert!(
367            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
368                meta.clone(),
369                Default::default(),
370                instruction
371            )))
372        );
373
374        // Close region
375        let instruction = close_region_instruction(region_id);
376        assert!(
377            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
378                meta.clone(),
379                Default::default(),
380                instruction
381            )))
382        );
383
384        // Downgrade region
385        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
386            region_id: RegionId::new(2048, 1),
387            flush_timeout: Some(Duration::from_secs(1)),
388        }]);
389        assert!(
390            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
391                meta.clone(),
392                Default::default(),
393                instruction
394            )))
395        );
396
397        // Upgrade region
398        let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
399            region_id,
400            ..Default::default()
401        }]);
402        assert!(
403            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
404                meta.clone(),
405                Default::default(),
406                instruction
407            )))
408        );
409
410        // Enter staging region
411        let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
412            region_id,
413            partition_directive: StagingPartitionDirective::UpdatePartitionExpr("".to_string()),
414        }]);
415        assert!(
416            heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
417                meta,
418                Default::default(),
419                instruction
420            )))
421        );
422    }
423
424    fn close_region_instruction(region_id: RegionId) -> Instruction {
425        Instruction::CloseRegions(vec![RegionIdent {
426            table_id: region_id.table_id(),
427            region_number: region_id.region_number(),
428            datanode_id: 2,
429            engine: MITO_ENGINE_NAME.to_string(),
430        }])
431    }
432
433    fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
434        Instruction::OpenRegions(vec![OpenRegion::new(
435            RegionIdent {
436                table_id: region_id.table_id(),
437                region_number: region_id.region_number(),
438                datanode_id: 2,
439                engine: MITO_ENGINE_NAME.to_string(),
440            },
441            path,
442            HashMap::new(),
443            HashMap::new(),
444            false,
445        )])
446    }
447
448    #[tokio::test]
449    async fn test_close_region() {
450        common_telemetry::init_default_ut_logging();
451
452        let mut region_server = mock_region_server();
453        let kv_backend = Arc::new(MemoryKvBackend::new());
454        let heartbeat_handler =
455            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
456
457        let mut engine_env = TestEnv::with_prefix("close-region").await;
458        let engine = engine_env.create_engine(MitoConfig::default()).await;
459        region_server.register_engine(Arc::new(engine));
460        let region_id = RegionId::new(1024, 1);
461
462        let builder = CreateRequestBuilder::new();
463        let create_req = builder.build();
464        region_server
465            .handle_request(region_id, RegionRequest::Create(create_req))
466            .await
467            .unwrap();
468
469        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
470
471        // Should be ok, if we try to close it twice.
472        for _ in 0..2 {
473            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
474            let instruction = close_region_instruction(region_id);
475
476            let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
477            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
478            assert_matches!(control, HandleControl::Continue);
479
480            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
481
482            if let InstructionReply::CloseRegions(reply) = reply {
483                assert!(reply.result);
484                assert!(reply.error.is_none());
485            } else {
486                unreachable!()
487            }
488
489            assert_matches!(
490                region_server
491                    .set_region_role(region_id, RegionRole::Leader)
492                    .unwrap_err(),
493                error::Error::RegionNotFound { .. }
494            );
495        }
496    }
497
498    #[tokio::test]
499    async fn test_open_region_ok() {
500        common_telemetry::init_default_ut_logging();
501
502        let mut region_server = mock_region_server();
503        let kv_backend = Arc::new(MemoryKvBackend::new());
504        let heartbeat_handler =
505            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
506
507        let mut engine_env = TestEnv::with_prefix("open-region").await;
508        let engine = engine_env.create_engine(MitoConfig::default()).await;
509        region_server.register_engine(Arc::new(engine));
510        let region_id = RegionId::new(1024, 1);
511
512        let builder = CreateRequestBuilder::new();
513        let mut create_req = builder.build();
514        let storage_path = "test";
515        create_req.table_dir = table_dir(storage_path, region_id.table_id());
516
517        region_server
518            .handle_request(region_id, RegionRequest::Create(create_req))
519            .await
520            .unwrap();
521
522        region_server
523            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
524            .await
525            .unwrap();
526        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
527
528        // Should be ok, if we try to open it twice.
529        for _ in 0..2 {
530            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
531            let instruction = open_region_instruction(region_id, storage_path);
532
533            let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
534            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
535            assert_matches!(control, HandleControl::Continue);
536
537            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
538
539            if let InstructionReply::OpenRegions(reply) = reply {
540                assert!(reply.result);
541                assert!(reply.error.is_none());
542            } else {
543                unreachable!()
544            }
545        }
546    }
547
548    #[tokio::test]
549    async fn test_open_not_exists_region() {
550        common_telemetry::init_default_ut_logging();
551
552        let mut region_server = mock_region_server();
553        let kv_backend = Arc::new(MemoryKvBackend::new());
554        let heartbeat_handler =
555            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
556
557        let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
558        let engine = engine_env.create_engine(MitoConfig::default()).await;
559        region_server.register_engine(Arc::new(engine));
560        let region_id = RegionId::new(1024, 1);
561        let storage_path = "test";
562
563        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
564
565        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
566        let instruction = open_region_instruction(region_id, storage_path);
567
568        let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
569        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
570        assert_matches!(control, HandleControl::Continue);
571
572        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
573
574        if let InstructionReply::OpenRegions(reply) = reply {
575            assert!(!reply.result);
576            assert!(reply.error.is_some());
577        } else {
578            unreachable!()
579        }
580    }
581
582    #[tokio::test]
583    async fn test_downgrade_region() {
584        common_telemetry::init_default_ut_logging();
585
586        let mut region_server = mock_region_server();
587        let kv_backend = Arc::new(MemoryKvBackend::new());
588        let heartbeat_handler =
589            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
590
591        let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
592        let engine = engine_env.create_engine(MitoConfig::default()).await;
593        region_server.register_engine(Arc::new(engine));
594        let region_id = RegionId::new(1024, 1);
595
596        let builder = CreateRequestBuilder::new();
597        let mut create_req = builder.build();
598        let storage_path = "test";
599        create_req.table_dir = table_dir(storage_path, region_id.table_id());
600
601        region_server
602            .handle_request(region_id, RegionRequest::Create(create_req))
603            .await
604            .unwrap();
605
606        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
607
608        // Should be ok, if we try to downgrade it twice.
609        for _ in 0..2 {
610            let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
611            let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
612                region_id,
613                flush_timeout: Some(Duration::from_secs(1)),
614            }]);
615
616            let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
617            let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
618            assert_matches!(control, HandleControl::Continue);
619
620            let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
621
622            let reply = &reply.expect_downgrade_regions_reply()[0];
623            assert!(reply.exists);
624            assert!(reply.error.is_none());
625            assert_eq!(reply.last_entry_id.unwrap(), 0);
626        }
627
628        // Downgrades a not exists region.
629        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
630        let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
631            region_id: RegionId::new(2048, 1),
632            flush_timeout: Some(Duration::from_secs(1)),
633        }]);
634        let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
635        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
636        assert_matches!(control, HandleControl::Continue);
637
638        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
639
640        let reply = reply.expect_downgrade_regions_reply();
641        assert!(!reply[0].exists);
642        assert!(reply[0].error.is_none());
643        assert!(reply[0].last_entry_id.is_none());
644    }
645}