Skip to main content

file_engine/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use store_api::metadata::RegionMetadataRef;
21use store_api::path_utils::region_name;
22use store_api::region_request::{RegionCreateRequest, RegionOpenRequest};
23use store_api::storage::RegionId;
24
25use crate::FileOptions;
26use crate::error::Result;
27use crate::manifest::FileRegionManifest;
28
29#[derive(Debug)]
30pub struct FileRegion {
31    pub(crate) region_dir: String,
32    pub(crate) file_options: FileOptions,
33    pub(crate) url: String,
34    pub(crate) format: Format,
35    pub(crate) options: HashMap<String, String>,
36    pub(crate) metadata: RegionMetadataRef,
37}
38
39pub type FileRegionRef = Arc<FileRegion>;
40
41impl FileRegion {
42    pub async fn create(
43        region_id: RegionId,
44        request: RegionCreateRequest,
45        object_store: &ObjectStore,
46    ) -> Result<FileRegionRef> {
47        let manifest = FileRegionManifest {
48            region_id,
49            column_metadatas: request.column_metadatas.clone(),
50            primary_key: request.primary_key.clone(),
51            options: request.options,
52        };
53
54        let region_dir = object_store::util::join_dir(
55            &request.table_dir,
56            &region_name(region_id.table_id(), region_id.region_sequence()),
57        );
58        let url = manifest.url()?;
59        let file_options = manifest.file_options()?;
60        let format = manifest.format()?;
61        let options = manifest.options.clone();
62        let metadata = manifest.metadata()?;
63
64        manifest.store(&region_dir, object_store).await?;
65
66        Ok(Arc::new(Self {
67            region_dir,
68            url,
69            file_options,
70            format,
71            options,
72            metadata,
73        }))
74    }
75
76    pub async fn open(
77        region_id: RegionId,
78        request: RegionOpenRequest,
79        object_store: &ObjectStore,
80    ) -> Result<FileRegionRef> {
81        let region_dir = object_store::util::join_dir(
82            &request.table_dir,
83            &region_name(region_id.table_id(), region_id.region_sequence()),
84        );
85        let manifest = FileRegionManifest::load(region_id, &region_dir, object_store).await?;
86
87        Ok(Arc::new(Self {
88            region_dir,
89            url: manifest.url()?,
90            file_options: manifest.file_options()?,
91            format: manifest.format()?,
92            metadata: manifest.metadata()?,
93            options: manifest.options,
94        }))
95    }
96
97    pub async fn drop(&self, object_store: &ObjectStore) -> Result<()> {
98        FileRegionManifest::delete(self.metadata.region_id, &self.region_dir, object_store).await
99    }
100
101    pub fn metadata(&self) -> RegionMetadataRef {
102        self.metadata.clone()
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::assert_matches;
109
110    use store_api::region_request::PathType;
111
112    use super::*;
113    use crate::error::Error;
114    use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};
115
116    #[tokio::test]
117    async fn test_create_region() {
118        let (_dir, object_store) = new_test_object_store("test_create_region");
119
120        let request = RegionCreateRequest {
121            engine: "file".to_string(),
122            column_metadatas: new_test_column_metadata(),
123            primary_key: vec![1],
124            options: new_test_options(),
125            table_dir: "create_region_dir/".to_string(),
126            path_type: PathType::Bare,
127            partition_expr_json: Some("".to_string()),
128            requirements: Default::default(),
129        };
130        let region_id = RegionId::new(1, 0);
131
132        let region = FileRegion::create(region_id, request.clone(), &object_store)
133            .await
134            .unwrap();
135
136        assert_eq!(region.region_dir, "create_region_dir/1_0000000000/");
137        assert_eq!(region.url, "test");
138        assert_eq!(region.file_options.files, vec!["1.csv"]);
139        assert_matches!(region.format, Format::Csv { .. });
140        assert_eq!(region.options, new_test_options());
141        assert_eq!(region.metadata.region_id, region_id);
142        assert_eq!(region.metadata.primary_key, vec![1]);
143
144        assert!(
145            object_store
146                .exists("create_region_dir/1_0000000000/manifest/_file_manifest")
147                .await
148                .unwrap()
149        );
150
151        // Object exists, should fail
152        let err = FileRegion::create(region_id, request, &object_store)
153            .await
154            .unwrap_err();
155        assert_matches!(err, Error::ManifestExists { .. });
156    }
157
158    #[tokio::test]
159    async fn test_open_region() {
160        let (_dir, object_store) = new_test_object_store("test_open_region");
161
162        let region_dir = "open_region_dir/".to_string();
163        let request = RegionCreateRequest {
164            engine: "file".to_string(),
165            column_metadatas: new_test_column_metadata(),
166            primary_key: vec![1],
167            options: new_test_options(),
168            table_dir: region_dir.clone(),
169            path_type: PathType::Bare,
170            partition_expr_json: Some("".to_string()),
171            requirements: Default::default(),
172        };
173        let region_id = RegionId::new(1, 0);
174
175        let _ = FileRegion::create(region_id, request.clone(), &object_store)
176            .await
177            .unwrap();
178
179        let request = RegionOpenRequest {
180            engine: "file".to_string(),
181            table_dir: region_dir,
182            path_type: PathType::Bare,
183            options: HashMap::default(),
184            skip_wal_replay: false,
185            checkpoint: None,
186            requirements: Default::default(),
187        };
188
189        let region = FileRegion::open(region_id, request, &object_store)
190            .await
191            .unwrap();
192
193        assert_eq!(region.region_dir, "open_region_dir/1_0000000000/");
194        assert_eq!(region.url, "test");
195        assert_eq!(region.file_options.files, vec!["1.csv"]);
196        assert_matches!(region.format, Format::Csv { .. });
197        assert_eq!(region.options, new_test_options());
198        assert_eq!(region.metadata.region_id, region_id);
199        assert_eq!(region.metadata.primary_key, vec![1]);
200    }
201
202    #[tokio::test]
203    async fn test_drop_region() {
204        let (_dir, object_store) = new_test_object_store("test_drop_region");
205
206        let region_dir = "drop_region_dir/".to_string();
207        let request = RegionCreateRequest {
208            engine: "file".to_string(),
209            column_metadatas: new_test_column_metadata(),
210            primary_key: vec![1],
211            options: new_test_options(),
212            table_dir: region_dir.clone(),
213            path_type: PathType::Bare,
214            partition_expr_json: Some("".to_string()),
215            requirements: Default::default(),
216        };
217        let region_id = RegionId::new(1, 0);
218
219        let region = FileRegion::create(region_id, request.clone(), &object_store)
220            .await
221            .unwrap();
222
223        assert!(
224            object_store
225                .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
226                .await
227                .unwrap()
228        );
229
230        FileRegion::drop(&region, &object_store).await.unwrap();
231        assert!(
232            !object_store
233                .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
234                .await
235                .unwrap()
236        );
237
238        let request = RegionOpenRequest {
239            engine: "file".to_string(),
240            table_dir: region_dir,
241            path_type: PathType::Bare,
242            options: HashMap::default(),
243            skip_wal_replay: false,
244            checkpoint: None,
245            requirements: Default::default(),
246        };
247        let err = FileRegion::open(region_id, request, &object_store)
248            .await
249            .unwrap_err();
250        assert_matches!(err, Error::LoadRegionManifest { .. });
251    }
252}