file_engine/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use store_api::metadata::RegionMetadataRef;
21use store_api::region_request::{RegionCreateRequest, RegionOpenRequest};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::manifest::FileRegionManifest;
26use crate::FileOptions;
27
28#[derive(Debug)]
29pub struct FileRegion {
30    pub(crate) region_dir: String,
31    pub(crate) file_options: FileOptions,
32    pub(crate) url: String,
33    pub(crate) format: Format,
34    pub(crate) options: HashMap<String, String>,
35    pub(crate) metadata: RegionMetadataRef,
36}
37
38pub type FileRegionRef = Arc<FileRegion>;
39
40impl FileRegion {
41    pub async fn create(
42        region_id: RegionId,
43        request: RegionCreateRequest,
44        object_store: &ObjectStore,
45    ) -> Result<FileRegionRef> {
46        let manifest = FileRegionManifest {
47            region_id,
48            column_metadatas: request.column_metadatas.clone(),
49            primary_key: request.primary_key.clone(),
50            options: request.options,
51        };
52
53        let region_dir = request.region_dir;
54        let url = manifest.url()?;
55        let file_options = manifest.file_options()?;
56        let format = manifest.format()?;
57        let options = manifest.options.clone();
58        let metadata = manifest.metadata()?;
59
60        manifest.store(&region_dir, object_store).await?;
61
62        Ok(Arc::new(Self {
63            region_dir,
64            url,
65            file_options,
66            format,
67            options,
68            metadata,
69        }))
70    }
71
72    pub async fn open(
73        region_id: RegionId,
74        request: RegionOpenRequest,
75        object_store: &ObjectStore,
76    ) -> Result<FileRegionRef> {
77        let manifest =
78            FileRegionManifest::load(region_id, &request.region_dir, object_store).await?;
79
80        Ok(Arc::new(Self {
81            region_dir: request.region_dir,
82            url: manifest.url()?,
83            file_options: manifest.file_options()?,
84            format: manifest.format()?,
85            metadata: manifest.metadata()?,
86            options: manifest.options,
87        }))
88    }
89
90    pub async fn drop(&self, object_store: &ObjectStore) -> Result<()> {
91        FileRegionManifest::delete(self.metadata.region_id, &self.region_dir, object_store).await
92    }
93
94    pub fn metadata(&self) -> RegionMetadataRef {
95        self.metadata.clone()
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use std::assert_matches::assert_matches;
102
103    use super::*;
104    use crate::error::Error;
105    use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};
106
107    #[tokio::test]
108    async fn test_create_region() {
109        let (_dir, object_store) = new_test_object_store("test_create_region");
110
111        let request = RegionCreateRequest {
112            engine: "file".to_string(),
113            column_metadatas: new_test_column_metadata(),
114            primary_key: vec![1],
115            options: new_test_options(),
116            region_dir: "create_region_dir/".to_string(),
117        };
118        let region_id = RegionId::new(1, 0);
119
120        let region = FileRegion::create(region_id, request.clone(), &object_store)
121            .await
122            .unwrap();
123
124        assert_eq!(region.region_dir, "create_region_dir/");
125        assert_eq!(region.url, "test");
126        assert_eq!(region.file_options.files, vec!["1.csv"]);
127        assert_matches!(region.format, Format::Csv { .. });
128        assert_eq!(region.options, new_test_options());
129        assert_eq!(region.metadata.region_id, region_id);
130        assert_eq!(region.metadata.primary_key, vec![1]);
131
132        assert!(object_store
133            .exists("create_region_dir/manifest/_file_manifest")
134            .await
135            .unwrap());
136
137        // Object exists, should fail
138        let err = FileRegion::create(region_id, request, &object_store)
139            .await
140            .unwrap_err();
141        assert_matches!(err, Error::ManifestExists { .. });
142    }
143
144    #[tokio::test]
145    async fn test_open_region() {
146        let (_dir, object_store) = new_test_object_store("test_open_region");
147
148        let region_dir = "open_region_dir/".to_string();
149        let request = RegionCreateRequest {
150            engine: "file".to_string(),
151            column_metadatas: new_test_column_metadata(),
152            primary_key: vec![1],
153            options: new_test_options(),
154            region_dir: region_dir.clone(),
155        };
156        let region_id = RegionId::new(1, 0);
157
158        let _ = FileRegion::create(region_id, request.clone(), &object_store)
159            .await
160            .unwrap();
161
162        let request = RegionOpenRequest {
163            engine: "file".to_string(),
164            region_dir,
165            options: HashMap::default(),
166            skip_wal_replay: false,
167        };
168
169        let region = FileRegion::open(region_id, request, &object_store)
170            .await
171            .unwrap();
172
173        assert_eq!(region.region_dir, "open_region_dir/");
174        assert_eq!(region.url, "test");
175        assert_eq!(region.file_options.files, vec!["1.csv"]);
176        assert_matches!(region.format, Format::Csv { .. });
177        assert_eq!(region.options, new_test_options());
178        assert_eq!(region.metadata.region_id, region_id);
179        assert_eq!(region.metadata.primary_key, vec![1]);
180    }
181
182    #[tokio::test]
183    async fn test_drop_region() {
184        let (_dir, object_store) = new_test_object_store("test_drop_region");
185
186        let region_dir = "drop_region_dir/".to_string();
187        let request = RegionCreateRequest {
188            engine: "file".to_string(),
189            column_metadatas: new_test_column_metadata(),
190            primary_key: vec![1],
191            options: new_test_options(),
192            region_dir: region_dir.clone(),
193        };
194        let region_id = RegionId::new(1, 0);
195
196        let region = FileRegion::create(region_id, request.clone(), &object_store)
197            .await
198            .unwrap();
199
200        assert!(object_store
201            .exists("drop_region_dir/manifest/_file_manifest")
202            .await
203            .unwrap());
204
205        FileRegion::drop(&region, &object_store).await.unwrap();
206        assert!(!object_store
207            .exists("drop_region_dir/manifest/_file_manifest")
208            .await
209            .unwrap());
210
211        let request = RegionOpenRequest {
212            engine: "file".to_string(),
213            region_dir,
214            options: HashMap::default(),
215            skip_wal_replay: false,
216        };
217        let err = FileRegion::open(region_id, request, &object_store)
218            .await
219            .unwrap_err();
220        assert_matches!(err, Error::LoadRegionManifest { .. });
221    }
222}