1use std::any::Any;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::consts::FILE_ENGINE;
22use common_error::ext::BoxedError;
23use common_recordbatch::SendableRecordBatchStream;
24use common_telemetry::{error, info};
25use object_store::ObjectStore;
26use snafu::{ensure, OptionExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_engine::{
29 RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
30 SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
31 SinglePartitionScanner, SyncManifestResponse,
32};
33use store_api::region_request::{
34 AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
35 RegionRequest,
36};
37use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
38use tokio::sync::Mutex;
39
40use crate::config::EngineConfig;
41use crate::error::{
42 RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, UnsupportedSnafu,
43};
44use crate::region::{FileRegion, FileRegionRef};
45
46pub struct FileRegionEngine {
47 inner: EngineInnerRef,
48}
49
50impl FileRegionEngine {
51 pub fn new(_config: EngineConfig, object_store: ObjectStore) -> Self {
52 Self {
53 inner: Arc::new(EngineInner::new(object_store)),
54 }
55 }
56
57 async fn handle_query(
58 &self,
59 region_id: RegionId,
60 request: ScanRequest,
61 ) -> Result<SendableRecordBatchStream, BoxedError> {
62 self.inner
63 .get_region(region_id)
64 .await
65 .context(RegionNotFoundSnafu { region_id })
66 .map_err(BoxedError::new)?
67 .query(request)
68 .map_err(BoxedError::new)
69 }
70}
71
72#[async_trait]
73impl RegionEngine for FileRegionEngine {
74 fn name(&self) -> &str {
75 FILE_ENGINE
76 }
77
78 async fn handle_request(
79 &self,
80 region_id: RegionId,
81 request: RegionRequest,
82 ) -> Result<RegionResponse, BoxedError> {
83 self.inner
84 .handle_request(region_id, request)
85 .await
86 .map_err(BoxedError::new)
87 }
88
89 async fn handle_query(
90 &self,
91 region_id: RegionId,
92 request: ScanRequest,
93 ) -> Result<RegionScannerRef, BoxedError> {
94 let stream = self.handle_query(region_id, request).await?;
95 let metadata = self.get_metadata(region_id).await?;
96 let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
98 Ok(scanner)
99 }
100
101 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
102 self.inner
103 .get_region(region_id)
104 .await
105 .map(|r| r.metadata())
106 .context(RegionNotFoundSnafu { region_id })
107 .map_err(BoxedError::new)
108 }
109
110 async fn stop(&self) -> Result<(), BoxedError> {
111 self.inner.stop().await.map_err(BoxedError::new)
112 }
113
114 fn region_statistic(&self, _: RegionId) -> Option<RegionStatistic> {
115 None
116 }
117
118 async fn get_last_seq_num(&self, _: RegionId) -> Result<Option<SequenceNumber>, BoxedError> {
119 Ok(None)
120 }
121
122 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
123 self.inner
124 .set_region_role(region_id, role)
125 .map_err(BoxedError::new)
126 }
127
128 async fn set_region_role_state_gracefully(
129 &self,
130 region_id: RegionId,
131 _region_role_state: SettableRegionRoleState,
132 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
133 let exists = self.inner.get_region(region_id).await.is_some();
134
135 if exists {
136 Ok(SetRegionRoleStateResponse::success(
137 SetRegionRoleStateSuccess::file(),
138 ))
139 } else {
140 Ok(SetRegionRoleStateResponse::NotFound)
141 }
142 }
143
144 async fn sync_region(
145 &self,
146 _region_id: RegionId,
147 _manifest_info: RegionManifestInfo,
148 ) -> Result<SyncManifestResponse, BoxedError> {
149 Ok(SyncManifestResponse::NotSupported)
151 }
152
153 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
154 self.inner.state(region_id)
155 }
156
157 fn as_any(&self) -> &dyn Any {
158 self
159 }
160}
161
162struct EngineInner {
163 regions: RwLock<HashMap<RegionId, FileRegionRef>>,
167
168 region_mutex: Mutex<()>,
171
172 object_store: ObjectStore,
173}
174
175type EngineInnerRef = Arc<EngineInner>;
176
177impl EngineInner {
178 fn new(object_store: ObjectStore) -> Self {
179 Self {
180 regions: RwLock::new(HashMap::new()),
181 region_mutex: Mutex::new(()),
182 object_store,
183 }
184 }
185
186 async fn handle_request(
187 &self,
188 region_id: RegionId,
189 request: RegionRequest,
190 ) -> EngineResult<RegionResponse> {
191 let result = match request {
192 RegionRequest::Create(req) => self.handle_create(region_id, req).await,
193 RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
194 RegionRequest::Open(req) => self.handle_open(region_id, req).await,
195 RegionRequest::Close(req) => self.handle_close(region_id, req).await,
196 _ => UnsupportedSnafu {
197 operation: request.to_string(),
198 }
199 .fail(),
200 };
201 result.map(RegionResponse::new)
202 }
203
204 async fn stop(&self) -> EngineResult<()> {
205 let _lock = self.region_mutex.lock().await;
206 self.regions.write().unwrap().clear();
207 Ok(())
208 }
209
210 fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> {
211 Ok(())
213 }
214
215 fn state(&self, region_id: RegionId) -> Option<RegionRole> {
216 if self.regions.read().unwrap().get(®ion_id).is_some() {
217 Some(RegionRole::Leader)
218 } else {
219 None
220 }
221 }
222}
223
224impl EngineInner {
225 async fn handle_create(
226 &self,
227 region_id: RegionId,
228 request: RegionCreateRequest,
229 ) -> EngineResult<AffectedRows> {
230 ensure!(
231 request.engine == FILE_ENGINE,
232 UnexpectedEngineSnafu {
233 engine: request.engine
234 }
235 );
236
237 if self.exists(region_id).await {
238 return Ok(0);
239 }
240
241 info!("Try to create region, region_id: {}", region_id);
242
243 let _lock = self.region_mutex.lock().await;
244 if self.exists(region_id).await {
246 return Ok(0);
247 }
248
249 let res = FileRegion::create(region_id, request, &self.object_store).await;
250 let region = res.inspect_err(|err| {
251 error!(
252 err;
253 "Failed to create region, region_id: {}",
254 region_id
255 );
256 })?;
257 self.regions.write().unwrap().insert(region_id, region);
258
259 info!("A new region is created, region_id: {}", region_id);
260 Ok(0)
261 }
262
263 async fn handle_open(
264 &self,
265 region_id: RegionId,
266 request: RegionOpenRequest,
267 ) -> EngineResult<AffectedRows> {
268 if self.exists(region_id).await {
269 return Ok(0);
270 }
271
272 info!("Try to open region, region_id: {}", region_id);
273
274 let _lock = self.region_mutex.lock().await;
275 if self.exists(region_id).await {
277 return Ok(0);
278 }
279
280 let res = FileRegion::open(region_id, request, &self.object_store).await;
281 let region = res.inspect_err(|err| {
282 error!(
283 err;
284 "Failed to open region, region_id: {}",
285 region_id
286 );
287 })?;
288 self.regions.write().unwrap().insert(region_id, region);
289
290 info!("Region opened, region_id: {}", region_id);
291 Ok(0)
292 }
293
294 async fn handle_close(
295 &self,
296 region_id: RegionId,
297 _request: RegionCloseRequest,
298 ) -> EngineResult<AffectedRows> {
299 let _lock = self.region_mutex.lock().await;
300
301 let mut regions = self.regions.write().unwrap();
302 if regions.remove(®ion_id).is_some() {
303 info!("Region closed, region_id: {}", region_id);
304 }
305
306 Ok(0)
307 }
308
309 async fn handle_drop(
310 &self,
311 region_id: RegionId,
312 _request: RegionDropRequest,
313 ) -> EngineResult<AffectedRows> {
314 if !self.exists(region_id).await {
315 return RegionNotFoundSnafu { region_id }.fail();
316 }
317
318 info!("Try to drop region, region_id: {}", region_id);
319
320 let _lock = self.region_mutex.lock().await;
321
322 let region = self.get_region(region_id).await;
323 if let Some(region) = region {
324 let res = FileRegion::drop(®ion, &self.object_store).await;
325 res.inspect_err(|err| {
326 error!(
327 err;
328 "Failed to drop region, region_id: {}",
329 region_id
330 );
331 })?;
332 }
333 let _ = self.regions.write().unwrap().remove(®ion_id);
334
335 info!("Region dropped, region_id: {}", region_id);
336 Ok(0)
337 }
338
339 async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
340 self.regions.read().unwrap().get(®ion_id).cloned()
341 }
342
343 async fn exists(&self, region_id: RegionId) -> bool {
344 self.regions.read().unwrap().contains_key(®ion_id)
345 }
346}