datanode/heartbeat/handler/
close_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::RegionIdent;
16use common_meta::instruction::{InstructionReply, SimpleReply};
17use common_telemetry::warn;
18use futures::future::join_all;
19use store_api::region_request::{RegionCloseRequest, RegionRequest};
20use store_api::storage::RegionId;
21
22use crate::error;
23use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
24
25#[derive(Debug, Clone, Copy, Default)]
26pub struct CloseRegionsHandler;
27
28#[async_trait::async_trait]
29impl InstructionHandler for CloseRegionsHandler {
30    type Instruction = Vec<RegionIdent>;
31
32    async fn handle(
33        &self,
34        ctx: &HandlerContext,
35        region_idents: Self::Instruction,
36    ) -> Option<InstructionReply> {
37        let region_ids = region_idents
38            .into_iter()
39            .map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number))
40            .collect::<Vec<_>>();
41
42        let futs = region_ids.iter().map(|region_id| {
43            ctx.region_server
44                .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
45        });
46
47        let results = join_all(futs).await;
48
49        let mut errors = vec![];
50        for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) {
51            match result {
52                Ok(_) => (),
53                Err(error::Error::RegionNotFound { .. }) => {
54                    warn!(
55                        "Received a close regions instruction from meta, but target region:{} is not found.",
56                        region_id
57                    );
58                }
59                Err(err) => errors.push(format!("region:{region_id}: {err:?}")),
60            }
61        }
62
63        if errors.is_empty() {
64            return Some(InstructionReply::CloseRegions(SimpleReply {
65                result: true,
66                error: None,
67            }));
68        }
69
70        Some(InstructionReply::CloseRegions(SimpleReply {
71            result: false,
72            error: Some(errors.join("; ")),
73        }))
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use std::assert_matches;
80    use std::sync::Arc;
81
82    use assert_matches::assert_matches;
83    use common_meta::RegionIdent;
84    use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
85    use common_meta::heartbeat::mailbox::MessageMeta;
86    use common_meta::instruction::Instruction;
87    use mito2::config::MitoConfig;
88    use mito2::engine::MITO_ENGINE_NAME;
89    use mito2::test_util::{CreateRequestBuilder, TestEnv};
90    use store_api::region_request::RegionRequest;
91    use store_api::storage::RegionId;
92
93    use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
94    use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
95    use crate::tests::mock_region_server;
96
97    fn close_regions_instruction(region_ids: impl IntoIterator<Item = RegionId>) -> Instruction {
98        let region_idents = region_ids
99            .into_iter()
100            .map(|region_id| RegionIdent {
101                table_id: region_id.table_id(),
102                region_number: region_id.region_number(),
103                datanode_id: 2,
104                engine: MITO_ENGINE_NAME.to_string(),
105            })
106            .collect();
107
108        Instruction::CloseRegions(region_idents)
109    }
110
111    #[tokio::test]
112    async fn test_close_regions() {
113        common_telemetry::init_default_ut_logging();
114
115        let mut region_server = mock_region_server();
116        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
117        let mut engine_env = TestEnv::with_prefix("close-regions").await;
118        let engine = engine_env.create_engine(MitoConfig::default()).await;
119        region_server.register_engine(Arc::new(engine.clone()));
120        let region_id = RegionId::new(1024, 1);
121        let region_id1 = RegionId::new(1024, 2);
122
123        let builder = CreateRequestBuilder::new();
124        let create_req = builder.build();
125        region_server
126            .handle_request(region_id, RegionRequest::Create(create_req))
127            .await
128            .unwrap();
129
130        let create_req1 = builder.build();
131        region_server
132            .handle_request(region_id1, RegionRequest::Create(create_req1))
133            .await
134            .unwrap();
135        let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
136        let instruction =
137            close_regions_instruction([region_id, region_id1, RegionId::new(1024, 3)]);
138        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
139        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
140        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
141        assert_matches!(control, HandleControl::Continue);
142
143        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
144        let reply = reply.expect_close_regions_reply();
145        assert!(reply.result);
146        assert!(reply.error.is_none());
147        assert!(!engine.is_region_exists(region_id));
148        assert!(!engine.is_region_exists(region_id1));
149        assert!(!engine.is_region_exists(RegionId::new(1024, 3)));
150    }
151}