file_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::consts::FILE_ENGINE;
22use common_error::ext::BoxedError;
23use common_recordbatch::SendableRecordBatchStream;
24use common_telemetry::{error, info};
25use object_store::ObjectStore;
26use snafu::{OptionExt, ensure};
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_engine::{
29    RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
30    RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
31    SetRegionRoleStateSuccess, SettableRegionRoleState, SinglePartitionScanner,
32    SyncManifestResponse,
33};
34use store_api::region_request::{
35    AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
36    RegionRequest,
37};
38use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
39use tokio::sync::Mutex;
40
41use crate::config::EngineConfig;
42use crate::error::{
43    RegionNotFoundSnafu, Result as EngineResult, UnexpectedEngineSnafu, UnsupportedSnafu,
44};
45use crate::region::{FileRegion, FileRegionRef};
46
47pub struct FileRegionEngine {
48    inner: EngineInnerRef,
49}
50
51impl FileRegionEngine {
52    pub fn new(_config: EngineConfig, object_store: ObjectStore) -> Self {
53        Self {
54            inner: Arc::new(EngineInner::new(object_store)),
55        }
56    }
57
58    async fn handle_query(
59        &self,
60        region_id: RegionId,
61        request: ScanRequest,
62    ) -> Result<SendableRecordBatchStream, BoxedError> {
63        self.inner
64            .get_region(region_id)
65            .await
66            .context(RegionNotFoundSnafu { region_id })
67            .map_err(BoxedError::new)?
68            .query(request)
69            .map_err(BoxedError::new)
70    }
71}
72
73#[async_trait]
74impl RegionEngine for FileRegionEngine {
75    fn name(&self) -> &str {
76        FILE_ENGINE
77    }
78
79    async fn handle_request(
80        &self,
81        region_id: RegionId,
82        request: RegionRequest,
83    ) -> Result<RegionResponse, BoxedError> {
84        self.inner
85            .handle_request(region_id, request)
86            .await
87            .map_err(BoxedError::new)
88    }
89
90    async fn handle_query(
91        &self,
92        region_id: RegionId,
93        request: ScanRequest,
94    ) -> Result<RegionScannerRef, BoxedError> {
95        let stream = self.handle_query(region_id, request).await?;
96        let metadata = self.get_metadata(region_id).await?;
97        // We don't support enabling append mode for file engine.
98        let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
99        Ok(scanner)
100    }
101
102    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
103        self.inner
104            .get_region(region_id)
105            .await
106            .map(|r| r.metadata())
107            .context(RegionNotFoundSnafu { region_id })
108            .map_err(BoxedError::new)
109    }
110
111    async fn stop(&self) -> Result<(), BoxedError> {
112        self.inner.stop().await.map_err(BoxedError::new)
113    }
114
115    fn region_statistic(&self, _: RegionId) -> Option<RegionStatistic> {
116        None
117    }
118
119    async fn get_committed_sequence(&self, _: RegionId) -> Result<SequenceNumber, BoxedError> {
120        Ok(Default::default())
121    }
122
123    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
124        self.inner
125            .set_region_role(region_id, role)
126            .map_err(BoxedError::new)
127    }
128
129    async fn set_region_role_state_gracefully(
130        &self,
131        region_id: RegionId,
132        _region_role_state: SettableRegionRoleState,
133    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
134        let exists = self.inner.get_region(region_id).await.is_some();
135
136        if exists {
137            Ok(SetRegionRoleStateResponse::success(
138                SetRegionRoleStateSuccess::file(),
139            ))
140        } else {
141            Ok(SetRegionRoleStateResponse::NotFound)
142        }
143    }
144
145    async fn sync_region(
146        &self,
147        _region_id: RegionId,
148        _manifest_info: RegionManifestInfo,
149    ) -> Result<SyncManifestResponse, BoxedError> {
150        // File engine doesn't need to sync region manifest.
151        Ok(SyncManifestResponse::NotSupported)
152    }
153
154    async fn remap_manifests(
155        &self,
156        _request: RemapManifestsRequest,
157    ) -> Result<RemapManifestsResponse, BoxedError> {
158        Err(BoxedError::new(
159            UnsupportedSnafu {
160                operation: "remap_manifests",
161            }
162            .build(),
163        ))
164    }
165
166    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
167        self.inner.state(region_id)
168    }
169
170    fn as_any(&self) -> &dyn Any {
171        self
172    }
173}
174
175struct EngineInner {
176    /// All regions opened by the engine.
177    ///
178    /// Writing to `regions` should also hold the `region_mutex`.
179    regions: RwLock<HashMap<RegionId, FileRegionRef>>,
180
181    /// Region mutex is used to protect the operations such as creating/opening/closing
182    /// a region, to avoid things like opening the same region simultaneously.
183    region_mutex: Mutex<()>,
184
185    object_store: ObjectStore,
186}
187
188type EngineInnerRef = Arc<EngineInner>;
189
190impl EngineInner {
191    fn new(object_store: ObjectStore) -> Self {
192        Self {
193            regions: RwLock::new(HashMap::new()),
194            region_mutex: Mutex::new(()),
195            object_store,
196        }
197    }
198
199    async fn handle_request(
200        &self,
201        region_id: RegionId,
202        request: RegionRequest,
203    ) -> EngineResult<RegionResponse> {
204        let result = match request {
205            RegionRequest::Create(req) => self.handle_create(region_id, req).await,
206            RegionRequest::Drop(req) => self.handle_drop(region_id, req).await,
207            RegionRequest::Open(req) => self.handle_open(region_id, req).await,
208            RegionRequest::Close(req) => self.handle_close(region_id, req).await,
209            _ => UnsupportedSnafu {
210                operation: request.to_string(),
211            }
212            .fail(),
213        };
214        result.map(RegionResponse::new)
215    }
216
217    async fn stop(&self) -> EngineResult<()> {
218        let _lock = self.region_mutex.lock().await;
219        self.regions.write().unwrap().clear();
220        Ok(())
221    }
222
223    fn set_region_role(&self, _region_id: RegionId, _region_role: RegionRole) -> EngineResult<()> {
224        // TODO(zhongzc): Improve the semantics and implementation of this API.
225        Ok(())
226    }
227
228    fn state(&self, region_id: RegionId) -> Option<RegionRole> {
229        if self.regions.read().unwrap().get(&region_id).is_some() {
230            Some(RegionRole::Leader)
231        } else {
232            None
233        }
234    }
235}
236
237impl EngineInner {
238    async fn handle_create(
239        &self,
240        region_id: RegionId,
241        request: RegionCreateRequest,
242    ) -> EngineResult<AffectedRows> {
243        ensure!(
244            request.engine == FILE_ENGINE,
245            UnexpectedEngineSnafu {
246                engine: request.engine
247            }
248        );
249
250        if self.exists(region_id).await {
251            return Ok(0);
252        }
253
254        info!("Try to create region, region_id: {}", region_id);
255
256        let _lock = self.region_mutex.lock().await;
257        // Check again after acquiring the lock
258        if self.exists(region_id).await {
259            return Ok(0);
260        }
261
262        let res = FileRegion::create(region_id, request, &self.object_store).await;
263        let region = res.inspect_err(|err| {
264            error!(
265                err;
266                "Failed to create region, region_id: {}",
267                region_id
268            );
269        })?;
270        self.regions.write().unwrap().insert(region_id, region);
271
272        info!("A new region is created, region_id: {}", region_id);
273        Ok(0)
274    }
275
276    async fn handle_open(
277        &self,
278        region_id: RegionId,
279        request: RegionOpenRequest,
280    ) -> EngineResult<AffectedRows> {
281        if self.exists(region_id).await {
282            return Ok(0);
283        }
284
285        info!("Try to open region, region_id: {}", region_id);
286
287        let _lock = self.region_mutex.lock().await;
288        // Check again after acquiring the lock
289        if self.exists(region_id).await {
290            return Ok(0);
291        }
292
293        let res = FileRegion::open(region_id, request, &self.object_store).await;
294        let region = res.inspect_err(|err| {
295            error!(
296                err;
297                "Failed to open region, region_id: {}",
298                region_id
299            );
300        })?;
301        self.regions.write().unwrap().insert(region_id, region);
302
303        info!("Region opened, region_id: {}", region_id);
304        Ok(0)
305    }
306
307    async fn handle_close(
308        &self,
309        region_id: RegionId,
310        _request: RegionCloseRequest,
311    ) -> EngineResult<AffectedRows> {
312        let _lock = self.region_mutex.lock().await;
313
314        let mut regions = self.regions.write().unwrap();
315        if regions.remove(&region_id).is_some() {
316            info!("Region closed, region_id: {}", region_id);
317        }
318
319        Ok(0)
320    }
321
322    async fn handle_drop(
323        &self,
324        region_id: RegionId,
325        _request: RegionDropRequest,
326    ) -> EngineResult<AffectedRows> {
327        if !self.exists(region_id).await {
328            return RegionNotFoundSnafu { region_id }.fail();
329        }
330
331        info!("Try to drop region, region_id: {}", region_id);
332
333        let _lock = self.region_mutex.lock().await;
334
335        let region = self.get_region(region_id).await;
336        if let Some(region) = region {
337            let res = FileRegion::drop(&region, &self.object_store).await;
338            res.inspect_err(|err| {
339                error!(
340                    err;
341                    "Failed to drop region, region_id: {}",
342                    region_id
343                );
344            })?;
345        }
346        let _ = self.regions.write().unwrap().remove(&region_id);
347
348        info!("Region dropped, region_id: {}", region_id);
349        Ok(0)
350    }
351
352    async fn get_region(&self, region_id: RegionId) -> Option<FileRegionRef> {
353        self.regions.read().unwrap().get(&region_id).cloned()
354    }
355
356    async fn exists(&self, region_id: RegionId) -> bool {
357        self.regions.read().unwrap().contains_key(&region_id)
358    }
359}