datanode/heartbeat/handler/
open_region.rs1use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
16use common_meta::wal_provider::prepare_wal_options;
17use store_api::path_utils::table_dir;
18use store_api::region_request::{PathType, RegionOpenRequest};
19use store_api::storage::RegionId;
20
21use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
22
23pub struct OpenRegionsHandler {
24 pub open_region_parallelism: usize,
25}
26
27#[async_trait::async_trait]
28impl InstructionHandler for OpenRegionsHandler {
29 type Instruction = Vec<OpenRegion>;
30 async fn handle(
31 &self,
32 ctx: &HandlerContext,
33 open_regions: Self::Instruction,
34 ) -> Option<InstructionReply> {
35 let requests = open_regions
36 .into_iter()
37 .map(|open_region| {
38 let OpenRegion {
39 region_ident,
40 region_storage_path,
41 mut region_options,
42 region_wal_options,
43 skip_wal_replay,
44 } = open_region;
45 let region_id = RegionId::new(region_ident.table_id, region_ident.region_number);
46 prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
47 let request = RegionOpenRequest {
48 engine: region_ident.engine,
49 table_dir: table_dir(®ion_storage_path, region_id.table_id()),
50 path_type: PathType::Bare,
51 options: region_options,
52 skip_wal_replay,
53 checkpoint: None,
54 };
55 (region_id, request)
56 })
57 .collect::<Vec<_>>();
58
59 let result = ctx
60 .region_server
61 .handle_batch_open_requests(self.open_region_parallelism, requests, false)
62 .await;
63 let success = result.is_ok();
64 let error = result.as_ref().map_err(|e| format!("{e:?}")).err();
65
66 Some(InstructionReply::OpenRegions(SimpleReply {
67 result: success,
68 error,
69 }))
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use std::assert_matches::assert_matches;
76 use std::collections::HashMap;
77 use std::sync::Arc;
78
79 use common_meta::RegionIdent;
80 use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
81 use common_meta::heartbeat::mailbox::MessageMeta;
82 use common_meta::instruction::{Instruction, OpenRegion};
83 use common_meta::kv_backend::memory::MemoryKvBackend;
84 use mito2::config::MitoConfig;
85 use mito2::engine::MITO_ENGINE_NAME;
86 use mito2::test_util::{CreateRequestBuilder, TestEnv};
87 use store_api::path_utils::table_dir;
88 use store_api::region_request::{RegionCloseRequest, RegionRequest};
89 use store_api::storage::RegionId;
90
91 use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
92 use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
93 use crate::tests::mock_region_server;
94
95 fn open_regions_instruction(
96 region_ids: impl IntoIterator<Item = RegionId>,
97 storage_path: &str,
98 ) -> Instruction {
99 let region_idents = region_ids
100 .into_iter()
101 .map(|region_id| OpenRegion {
102 region_ident: RegionIdent {
103 datanode_id: 0,
104 table_id: region_id.table_id(),
105 region_number: region_id.region_number(),
106 engine: MITO_ENGINE_NAME.to_string(),
107 },
108 region_storage_path: storage_path.to_string(),
109 region_options: HashMap::new(),
110 region_wal_options: HashMap::new(),
111 skip_wal_replay: false,
112 })
113 .collect();
114
115 Instruction::OpenRegions(region_idents)
116 }
117
118 #[tokio::test]
119 async fn test_open_regions() {
120 common_telemetry::init_default_ut_logging();
121
122 let mut region_server = mock_region_server();
123 let kv_backend = Arc::new(MemoryKvBackend::new());
124 let heartbeat_handler =
125 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
126 let mut engine_env = TestEnv::with_prefix("open-regions").await;
127 let engine = engine_env.create_engine(MitoConfig::default()).await;
128 region_server.register_engine(Arc::new(engine.clone()));
129 let region_id = RegionId::new(1024, 1);
130 let region_id1 = RegionId::new(1024, 2);
131 let storage_path = "test";
132 let builder = CreateRequestBuilder::new();
133 let mut create_req = builder.build();
134 create_req.table_dir = table_dir(storage_path, region_id.table_id());
135 region_server
136 .handle_request(region_id, RegionRequest::Create(create_req))
137 .await
138 .unwrap();
139 let mut create_req1 = builder.build();
140 create_req1.table_dir = table_dir(storage_path, region_id1.table_id());
141 region_server
142 .handle_request(region_id1, RegionRequest::Create(create_req1))
143 .await
144 .unwrap();
145 region_server
146 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
147 .await
148 .unwrap();
149 region_server
150 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
151 .await
152 .unwrap();
153
154 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
155 let instruction = open_regions_instruction([region_id, region_id1], storage_path);
156 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
157 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
158 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
159 assert_matches!(control, HandleControl::Continue);
160 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
161
162 let reply = reply.expect_open_regions_reply();
163 assert!(reply.result);
164 assert!(reply.error.is_none());
165
166 assert!(engine.is_region_exists(region_id));
167 assert!(engine.is_region_exists(region_id1));
168 }
169}