datanode/heartbeat/handler/
open_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
16use common_meta::wal_options_allocator::prepare_wal_options;
17use store_api::path_utils::table_dir;
18use store_api::region_request::{PathType, RegionOpenRequest};
19use store_api::storage::RegionId;
20
21use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
22
23pub struct OpenRegionsHandler {
24    pub open_region_parallelism: usize,
25}
26
27#[async_trait::async_trait]
28impl InstructionHandler for OpenRegionsHandler {
29    async fn handle(
30        &self,
31        ctx: &HandlerContext,
32        instruction: Instruction,
33    ) -> Option<InstructionReply> {
34        let open_regions = instruction.into_open_regions().unwrap();
35
36        let requests = open_regions
37            .into_iter()
38            .map(|open_region| {
39                let OpenRegion {
40                    region_ident,
41                    region_storage_path,
42                    mut region_options,
43                    region_wal_options,
44                    skip_wal_replay,
45                } = open_region;
46                let region_id = RegionId::new(region_ident.table_id, region_ident.region_number);
47                prepare_wal_options(&mut region_options, region_id, &region_wal_options);
48                let request = RegionOpenRequest {
49                    engine: region_ident.engine,
50                    table_dir: table_dir(&region_storage_path, region_id.table_id()),
51                    path_type: PathType::Bare,
52                    options: region_options,
53                    skip_wal_replay,
54                    checkpoint: None,
55                };
56                (region_id, request)
57            })
58            .collect::<Vec<_>>();
59
60        let result = ctx
61            .region_server
62            .handle_batch_open_requests(self.open_region_parallelism, requests, false)
63            .await;
64        let success = result.is_ok();
65        let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
66
67        Some(InstructionReply::OpenRegions(SimpleReply {
68            result: success,
69            error,
70        }))
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use std::assert_matches::assert_matches;
77    use std::collections::HashMap;
78    use std::sync::Arc;
79
80    use common_meta::RegionIdent;
81    use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
82    use common_meta::heartbeat::mailbox::MessageMeta;
83    use common_meta::instruction::{Instruction, OpenRegion};
84    use mito2::config::MitoConfig;
85    use mito2::engine::MITO_ENGINE_NAME;
86    use mito2::test_util::{CreateRequestBuilder, TestEnv};
87    use store_api::path_utils::table_dir;
88    use store_api::region_request::{RegionCloseRequest, RegionRequest};
89    use store_api::storage::RegionId;
90
91    use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
92    use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
93    use crate::tests::mock_region_server;
94
95    fn open_regions_instruction(
96        region_ids: impl IntoIterator<Item = RegionId>,
97        storage_path: &str,
98    ) -> Instruction {
99        let region_idents = region_ids
100            .into_iter()
101            .map(|region_id| OpenRegion {
102                region_ident: RegionIdent {
103                    datanode_id: 0,
104                    table_id: region_id.table_id(),
105                    region_number: region_id.region_number(),
106                    engine: MITO_ENGINE_NAME.to_string(),
107                },
108                region_storage_path: storage_path.to_string(),
109                region_options: HashMap::new(),
110                region_wal_options: HashMap::new(),
111                skip_wal_replay: false,
112            })
113            .collect();
114
115        Instruction::OpenRegions(region_idents)
116    }
117
118    #[tokio::test]
119    async fn test_open_regions() {
120        common_telemetry::init_default_ut_logging();
121
122        let mut region_server = mock_region_server();
123        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
124        let mut engine_env = TestEnv::with_prefix("open-regions").await;
125        let engine = engine_env.create_engine(MitoConfig::default()).await;
126        region_server.register_engine(Arc::new(engine.clone()));
127        let region_id = RegionId::new(1024, 1);
128        let region_id1 = RegionId::new(1024, 2);
129        let storage_path = "test";
130        let builder = CreateRequestBuilder::new();
131        let mut create_req = builder.build();
132        create_req.table_dir = table_dir(storage_path, region_id.table_id());
133        region_server
134            .handle_request(region_id, RegionRequest::Create(create_req))
135            .await
136            .unwrap();
137        let mut create_req1 = builder.build();
138        create_req1.table_dir = table_dir(storage_path, region_id1.table_id());
139        region_server
140            .handle_request(region_id1, RegionRequest::Create(create_req1))
141            .await
142            .unwrap();
143        region_server
144            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
145            .await
146            .unwrap();
147        region_server
148            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
149            .await
150            .unwrap();
151
152        let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
153        let instruction = open_regions_instruction([region_id, region_id1], storage_path);
154        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
155        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
156        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
157        assert_matches!(control, HandleControl::Continue);
158        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
159
160        let reply = reply.expect_open_regions_reply();
161        assert!(reply.result);
162        assert!(reply.error.is_none());
163
164        assert!(engine.is_region_exists(region_id));
165        assert!(engine.is_region_exists(region_id1));
166    }
167}