file_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::consts::FILE_ENGINE;
22use common_error::ext::BoxedError;
23use common_recordbatch::SendableRecordBatchStream;
24use common_telemetry::{error, info};
25use object_store::ObjectStore;
26use snafu::{ensure, OptionExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_engine::{
29    RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
30    SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
31    SinglePartitionScanner, SyncManifestResponse,
32};
33use store_api::region_request::{
34    AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
35    RegionRequest,
36};
37use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
38use tokio::sync::Mutex;
39
40use crate::config::EngineConfig;
41use crate::error::{
42    RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, UnsupportedSnafu,
43};
44use crate::region::{FileRegion, FileRegionRef};
45
46pub struct FileRegionEngine {
47    inner: EngineInnerRef,
48}
49
50impl FileRegionEngine {
51    pub fn new(_config: EngineConfig, object_store: ObjectStore) -> Self {
52        Self {
53            inner: Arc::new(EngineInner::new(object_store)),
54        }
55    }
56
57    async fn handle_query(
58        &self,
59        region_id: RegionId,
60        request: ScanRequest,
61    ) -> Result<SendableRecordBatchStream, BoxedError> {
62        self.inner
63            .get_region(region_id)
64            .await
65            .context(RegionNotFoundSnafu { region_id })
66            .map_err(BoxedError::new)?
67            .query(request)
68            .map_err(BoxedError::new)
69    }
70}
71
72#[async_trait]
73impl RegionEngine for FileRegionEngine {
74    fn name(&self) -> &str {
75        FILE_ENGINE
76    }
77
78    async fn handle_request(
79        &self,
80        region_id: RegionId,
81        request: RegionRequest,
82    ) -> Result<RegionResponse, BoxedError> {
83        self.inner
84            .handle_request(region_id, request)
85            .await
86            .map_err(BoxedError::new)
87    }
88
89    async fn handle_query(
90        &self,
91        region_id: RegionId,
92        request: ScanRequest,
93    ) -> Result<RegionScannerRef, BoxedError> {
94        let stream = self.handle_query(region_id, request).await?;
95        let metadata = self.get_metadata(region_id).await?;
96        // We don't support enabling append mode for file engine.
97        let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
98        Ok(scanner)
99    }
100
101    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
102        self.inner
103            .get_region(region_id)
104            .await
105            .map(|r| r.metadata())
106            .context(RegionNotFoundSnafu { region_id })
107            .map_err(BoxedError::new)
108    }
109
110    async fn stop(&self) -> Result<(), BoxedError> {
111        self.inner.stop().await.map_err(BoxedError::new)
112    }
113
114    fn region_statistic(&self, _: RegionId) -> Option<RegionStatistic> {
115        None
116    }
117
118    async fn get_last_seq_num(&self, _: RegionId) -> Result<Option<SequenceNumber>, BoxedError> {
119        Ok(None)
120    }
121
122    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
123        self.inner
124            .set_region_role(region_id, role)
125            .map_err(BoxedError::new)
126    }
127
128    async fn set_region_role_state_gracefully(
129        &self,
130        region_id: RegionId,
131        _region_role_state: SettableRegionRoleState,
132    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
133        let exists = self.inner.get_region(region_id).await.is_some();
134
135        if exists {
136            Ok(SetRegionRoleStateResponse::success(
137                SetRegionRoleStateSuccess::file(),
138            ))
139        } else {
140            Ok(SetRegionRoleStateResponse::NotFound)
141        }
142    }
143
144    async fn sync_region(
145        &self,
146        _region_id: RegionId,
147        _manifest_info: RegionManifestInfo,
148    ) -> Result<SyncManifestResponse, BoxedError> {
149        // File engine doesn't need to sync region manifest.
150        Ok(SyncManifestResponse::NotSupported)
151    }
152
153    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
154        self.inner.state(region_id)
155    }
156
157    fn as_any(&self) -> &dyn Any {
158        self
159    }
160}
161
162struct EngineInner {
163    /// All regions opened by the engine.
164    ///
165    /// Writing to `regions` should also hold the `region_mutex`.
166    regions: RwLock<HashMap<RegionId, FileRegionRef>>,
167
168    /// Region mutex is used to protect the operations such as creating/opening/closing
169    /// a region, to avoid things like opening the same region simultaneously.
170    region_mutex: Mutex<()>,
171
172    object_store: ObjectStore,
173}
174
175type EngineInnerRef = Arc<EngineInner>;
176
177impl EngineInner {
178    fn new(object_store: ObjectStore) -> Self {
179        Self {
180            regions: RwLock::new(HashMap::new()),
181            region_mutex: Mutex::new(()),
182            object_store,
183        }
184    }
185
186    async fn handle_request(
187        &self,
188        region_id: RegionId,
189        request: RegionRequest,
190    ) -> EngineResult<RegionResponse> {
191        let result = match request {
192            RegionRequest::Create(req) => self.handle_create(region_id, req).await,
193            RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
194            RegionRequest::Open(req) => self.handle_open(region_id, req).await,
195            RegionRequest::Close(req) => self.handle_close(region_id, req).await,
196            _ => UnsupportedSnafu {
197                operation: request.to_string(),
198            }
199            .fail(),
200        };
201        result.map(RegionResponse::new)
202    }
203
204    async fn stop(&self) -> EngineResult<()> {
205        let _lock = self.region_mutex.lock().await;
206        self.regions.write().unwrap().clear();
207        Ok(())
208    }
209
210    fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> {
211        // TODO(zhongzc): Improve the semantics and implementation of this API.
212        Ok(())
213    }
214
215    fn state(&self, region_id: RegionId) -> Option<RegionRole> {
216        if self.regions.read().unwrap().get(&region_id).is_some() {
217            Some(RegionRole::Leader)
218        } else {
219            None
220        }
221    }
222}
223
224impl EngineInner {
225    async fn handle_create(
226        &self,
227        region_id: RegionId,
228        request: RegionCreateRequest,
229    ) -> EngineResult<AffectedRows> {
230        ensure!(
231            request.engine == FILE_ENGINE,
232            UnexpectedEngineSnafu {
233                engine: request.engine
234            }
235        );
236
237        if self.exists(region_id).await {
238            return Ok(0);
239        }
240
241        info!("Try to create region, region_id: {}", region_id);
242
243        let _lock = self.region_mutex.lock().await;
244        // Check again after acquiring the lock
245        if self.exists(region_id).await {
246            return Ok(0);
247        }
248
249        let res = FileRegion::create(region_id, request, &self.object_store).await;
250        let region = res.inspect_err(|err| {
251            error!(
252                err;
253                "Failed to create region, region_id: {}",
254                region_id
255            );
256        })?;
257        self.regions.write().unwrap().insert(region_id, region);
258
259        info!("A new region is created, region_id: {}", region_id);
260        Ok(0)
261    }
262
263    async fn handle_open(
264        &self,
265        region_id: RegionId,
266        request: RegionOpenRequest,
267    ) -> EngineResult<AffectedRows> {
268        if self.exists(region_id).await {
269            return Ok(0);
270        }
271
272        info!("Try to open region, region_id: {}", region_id);
273
274        let _lock = self.region_mutex.lock().await;
275        // Check again after acquiring the lock
276        if self.exists(region_id).await {
277            return Ok(0);
278        }
279
280        let res = FileRegion::open(region_id, request, &self.object_store).await;
281        let region = res.inspect_err(|err| {
282            error!(
283                err;
284                "Failed to open region, region_id: {}",
285                region_id
286            );
287        })?;
288        self.regions.write().unwrap().insert(region_id, region);
289
290        info!("Region opened, region_id: {}", region_id);
291        Ok(0)
292    }
293
294    async fn handle_close(
295        &self,
296        region_id: RegionId,
297        _request: RegionCloseRequest,
298    ) -> EngineResult<AffectedRows> {
299        let _lock = self.region_mutex.lock().await;
300
301        let mut regions = self.regions.write().unwrap();
302        if regions.remove(&region_id).is_some() {
303            info!("Region closed, region_id: {}", region_id);
304        }
305
306        Ok(0)
307    }
308
309    async fn handle_drop(
310        &self,
311        region_id: RegionId,
312        _request: RegionDropRequest,
313    ) -> EngineResult<AffectedRows> {
314        if !self.exists(region_id).await {
315            return RegionNotFoundSnafu { region_id }.fail();
316        }
317
318        info!("Try to drop region, region_id: {}", region_id);
319
320        let _lock = self.region_mutex.lock().await;
321
322        let region = self.get_region(region_id).await;
323        if let Some(region) = region {
324            let res = FileRegion::drop(&region, &self.object_store).await;
325            res.inspect_err(|err| {
326                error!(
327                    err;
328                    "Failed to drop region, region_id: {}",
329                    region_id
330                );
331            })?;
332        }
333        let _ = self.regions.write().unwrap().remove(&region_id);
334
335        info!("Region dropped, region_id: {}", region_id);
336        Ok(0)
337    }
338
339    async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
340        self.regions.read().unwrap().get(&region_id).cloned()
341    }
342
343    async fn exists(&self, region_id: RegionId) -> bool {
344        self.regions.read().unwrap().contains_key(&region_id)
345    }
346}