1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::consts::FILE_ENGINE;
22use common_error::ext::BoxedError;
23use common_recordbatch::SendableRecordBatchStream;
24use common_telemetry::{error, info};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ensure};
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_engine::{
29 RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
30 RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
31 SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner,
32 SyncManifestResponse,
33};
34use store_api::region_request::{
35 AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
36 RegionRequest,
37};
38use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
39use tokio::sync::Mutex;
40
41use crate::config::EngineConfig;
42use crate::error::{
43 RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, UnsupportedSnafu,
44};
45use crate::region::{FileRegion, FileRegionRef};
46
47pub struct FileRegionEngine {
48 inner: EngineInnerRef,
49}
50
51impl FileRegionEngine {
52 pub fn new(_config: EngineConfig, object_store: ObjectStore) -> Self {
53 Self {
54 inner: Arc::new(EngineInner::new(object_store)),
55 }
56 }
57
58 async fn handle_query(
59 &self,
60 region_id: RegionId,
61 request: ScanRequest,
62 ) -> Result<SendableRecordBatchStream, BoxedError> {
63 self.inner
64 .get_region(region_id)
65 .await
66 .context(RegionNotFoundSnafu { region_id })
67 .map_err(BoxedError::new)?
68 .query(request)
69 .map_err(BoxedError::new)
70 }
71}
72
73#[async_trait]
74impl RegionEngine for FileRegionEngine {
75 fn name(&self) -> &str {
76 FILE_ENGINE
77 }
78
79 async fn handle_request(
80 &self,
81 region_id: RegionId,
82 request: RegionRequest,
83 ) -> Result<RegionResponse, BoxedError> {
84 self.inner
85 .handle_request(region_id, request)
86 .await
87 .map_err(BoxedError::new)
88 }
89
90 async fn handle_query(
91 &self,
92 region_id: RegionId,
93 request: ScanRequest,
94 ) -> Result<RegionScannerRef, BoxedError> {
95 let stream = self.handle_query(region_id, request).await?;
96 let metadata = self.get_metadata(region_id).await?;
97 let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
99 Ok(scanner)
100 }
101
102 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
103 self.inner
104 .get_region(region_id)
105 .await
106 .map(|r| r.metadata())
107 .context(RegionNotFoundSnafu { region_id })
108 .map_err(BoxedError::new)
109 }
110
111 async fn stop(&self) -> Result<(), BoxedError> {
112 self.inner.stop().await.map_err(BoxedError::new)
113 }
114
115 fn region_statistic(&self, _: RegionId) -> Option<RegionStatistic> {
116 None
117 }
118
119 async fn get_committed_sequence(&self, _: RegionId) -> Result<SequenceNumber, BoxedError> {
120 Ok(Default::default())
121 }
122
123 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
124 self.inner
125 .set_region_role(region_id, role)
126 .map_err(BoxedError::new)
127 }
128
129 async fn set_region_role_state_gracefully(
130 &self,
131 region_id: RegionId,
132 _region_role_state: SettableRegionRoleState,
133 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
134 let exists = self.inner.get_region(region_id).await.is_some();
135
136 if exists {
137 Ok(SetRegionRoleStateResponse::success(
138 SetRegionRoleStateSuccess::file(),
139 ))
140 } else {
141 Ok(SetRegionRoleStateResponse::NotFound)
142 }
143 }
144
145 async fn sync_region(
146 &self,
147 _region_id: RegionId,
148 _manifest_info: RegionManifestInfo,
149 ) -> Result<SyncManifestResponse, BoxedError> {
150 Ok(SyncManifestResponse::NotSupported)
152 }
153
154 async fn remap_manifests(
155 &self,
156 _request: RemapManifestsRequest,
157 ) -> Result<RemapManifestsResponse, BoxedError> {
158 Err(BoxedError::new(
159 UnsupportedSnafu {
160 operation: "remap_manifests",
161 }
162 .build(),
163 ))
164 }
165
166 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
167 self.inner.state(region_id)
168 }
169
170 fn as_any(&self) -> &dyn Any {
171 self
172 }
173}
174
175struct EngineInner {
176 regions: RwLock<HashMap<RegionId, FileRegionRef>>,
180
181 region_mutex: Mutex<()>,
184
185 object_store: ObjectStore,
186}
187
188type EngineInnerRef = Arc<EngineInner>;
189
190impl EngineInner {
191 fn new(object_store: ObjectStore) -> Self {
192 Self {
193 regions: RwLock::new(HashMap::new()),
194 region_mutex: Mutex::new(()),
195 object_store,
196 }
197 }
198
199 async fn handle_request(
200 &self,
201 region_id: RegionId,
202 request: RegionRequest,
203 ) -> EngineResult<RegionResponse> {
204 let result = match request {
205 RegionRequest::Create(req) => self.handle_create(region_id, req).await,
206 RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
207 RegionRequest::Open(req) => self.handle_open(region_id, req).await,
208 RegionRequest::Close(req) => self.handle_close(region_id, req).await,
209 _ => UnsupportedSnafu {
210 operation: request.to_string(),
211 }
212 .fail(),
213 };
214 result.map(RegionResponse::new)
215 }
216
217 async fn stop(&self) -> EngineResult<()> {
218 let _lock = self.region_mutex.lock().await;
219 self.regions.write().unwrap().clear();
220 Ok(())
221 }
222
223 fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> {
224 Ok(())
226 }
227
228 fn state(&self, region_id: RegionId) -> Option<RegionRole> {
229 if self.regions.read().unwrap().get(®ion_id).is_some() {
230 Some(RegionRole::Leader)
231 } else {
232 None
233 }
234 }
235}
236
237impl EngineInner {
238 async fn handle_create(
239 &self,
240 region_id: RegionId,
241 request: RegionCreateRequest,
242 ) -> EngineResult<AffectedRows> {
243 ensure!(
244 request.engine == FILE_ENGINE,
245 UnexpectedEngineSnafu {
246 engine: request.engine
247 }
248 );
249
250 if self.exists(region_id).await {
251 return Ok(0);
252 }
253
254 info!("Try to create region, region_id: {}", region_id);
255
256 let _lock = self.region_mutex.lock().await;
257 if self.exists(region_id).await {
259 return Ok(0);
260 }
261
262 let res = FileRegion::create(region_id, request, &self.object_store).await;
263 let region = res.inspect_err(|err| {
264 error!(
265 err;
266 "Failed to create region, region_id: {}",
267 region_id
268 );
269 })?;
270 self.regions.write().unwrap().insert(region_id, region);
271
272 info!("A new region is created, region_id: {}", region_id);
273 Ok(0)
274 }
275
276 async fn handle_open(
277 &self,
278 region_id: RegionId,
279 request: RegionOpenRequest,
280 ) -> EngineResult<AffectedRows> {
281 if self.exists(region_id).await {
282 return Ok(0);
283 }
284
285 info!("Try to open region, region_id: {}", region_id);
286
287 let _lock = self.region_mutex.lock().await;
288 if self.exists(region_id).await {
290 return Ok(0);
291 }
292
293 let res = FileRegion::open(region_id, request, &self.object_store).await;
294 let region = res.inspect_err(|err| {
295 error!(
296 err;
297 "Failed to open region, region_id: {}",
298 region_id
299 );
300 })?;
301 self.regions.write().unwrap().insert(region_id, region);
302
303 info!("Region opened, region_id: {}", region_id);
304 Ok(0)
305 }
306
307 async fn handle_close(
308 &self,
309 region_id: RegionId,
310 _request: RegionCloseRequest,
311 ) -> EngineResult<AffectedRows> {
312 let _lock = self.region_mutex.lock().await;
313
314 let mut regions = self.regions.write().unwrap();
315 if regions.remove(®ion_id).is_some() {
316 info!("Region closed, region_id: {}", region_id);
317 }
318
319 Ok(0)
320 }
321
322 async fn handle_drop(
323 &self,
324 region_id: RegionId,
325 _request: RegionDropRequest,
326 ) -> EngineResult<AffectedRows> {
327 if !self.exists(region_id).await {
328 return RegionNotFoundSnafu { region_id }.fail();
329 }
330
331 info!("Try to drop region, region_id: {}", region_id);
332
333 let _lock = self.region_mutex.lock().await;
334
335 let region = self.get_region(region_id).await;
336 if let Some(region) = region {
337 let res = FileRegion::drop(®ion, &self.object_store).await;
338 res.inspect_err(|err| {
339 error!(
340 err;
341 "Failed to drop region, region_id: {}",
342 region_id
343 );
344 })?;
345 }
346 let _ = self.regions.write().unwrap().remove(®ion_id);
347
348 info!("Region dropped, region_id: {}", region_id);
349 Ok(0)
350 }
351
352 async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
353 self.regions.read().unwrap().get(®ion_id).cloned()
354 }
355
356 async fn exists(&self, region_id: RegionId) -> bool {
357 self.regions.read().unwrap().contains_key(®ion_id)
358 }
359}