file_engine/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use store_api::metadata::RegionMetadataRef;
21use store_api::path_utils::region_name;
22use store_api::region_request::{RegionCreateRequest, RegionOpenRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::manifest::FileRegionManifest;
27use crate::FileOptions;
28
29#[derive(Debug)]
30pub struct FileRegion {
31    pub(crate) region_dir: String,
32    pub(crate) file_options: FileOptions,
33    pub(crate) url: String,
34    pub(crate) format: Format,
35    pub(crate) options: HashMap<String, String>,
36    pub(crate) metadata: RegionMetadataRef,
37}
38
39pub type FileRegionRef = Arc<FileRegion>;
40
41impl FileRegion {
42    pub async fn create(
43        region_id: RegionId,
44        request: RegionCreateRequest,
45        object_store: &ObjectStore,
46    ) -> Result<FileRegionRef> {
47        let manifest = FileRegionManifest {
48            region_id,
49            column_metadatas: request.column_metadatas.clone(),
50            primary_key: request.primary_key.clone(),
51            options: request.options,
52        };
53
54        let region_dir = object_store::util::join_dir(
55            &request.table_dir,
56            &region_name(region_id.table_id(), region_id.region_sequence()),
57        );
58        let url = manifest.url()?;
59        let file_options = manifest.file_options()?;
60        let format = manifest.format()?;
61        let options = manifest.options.clone();
62        let metadata = manifest.metadata()?;
63
64        manifest.store(&region_dir, object_store).await?;
65
66        Ok(Arc::new(Self {
67            region_dir,
68            url,
69            file_options,
70            format,
71            options,
72            metadata,
73        }))
74    }
75
76    pub async fn open(
77        region_id: RegionId,
78        request: RegionOpenRequest,
79        object_store: &ObjectStore,
80    ) -> Result<FileRegionRef> {
81        let region_dir = object_store::util::join_dir(
82            &request.table_dir,
83            &region_name(region_id.table_id(), region_id.region_sequence()),
84        );
85        let manifest = FileRegionManifest::load(region_id, &region_dir, object_store).await?;
86
87        Ok(Arc::new(Self {
88            region_dir,
89            url: manifest.url()?,
90            file_options: manifest.file_options()?,
91            format: manifest.format()?,
92            metadata: manifest.metadata()?,
93            options: manifest.options,
94        }))
95    }
96
97    pub async fn drop(&self, object_store: &ObjectStore) -> Result<()> {
98        FileRegionManifest::delete(self.metadata.region_id, &self.region_dir, object_store).await
99    }
100
101    pub fn metadata(&self) -> RegionMetadataRef {
102        self.metadata.clone()
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use std::assert_matches::assert_matches;
109
110    use store_api::region_request::PathType;
111
112    use super::*;
113    use crate::error::Error;
114    use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};
115
116    #[tokio::test]
117    async fn test_create_region() {
118        let (_dir, object_store) = new_test_object_store("test_create_region");
119
120        let request = RegionCreateRequest {
121            engine: "file".to_string(),
122            column_metadatas: new_test_column_metadata(),
123            primary_key: vec![1],
124            options: new_test_options(),
125            table_dir: "create_region_dir/".to_string(),
126            path_type: PathType::Bare,
127            partition_expr_json: Some("".to_string()),
128        };
129        let region_id = RegionId::new(1, 0);
130
131        let region = FileRegion::create(region_id, request.clone(), &object_store)
132            .await
133            .unwrap();
134
135        assert_eq!(region.region_dir, "create_region_dir/1_0000000000/");
136        assert_eq!(region.url, "test");
137        assert_eq!(region.file_options.files, vec!["1.csv"]);
138        assert_matches!(region.format, Format::Csv { .. });
139        assert_eq!(region.options, new_test_options());
140        assert_eq!(region.metadata.region_id, region_id);
141        assert_eq!(region.metadata.primary_key, vec![1]);
142
143        assert!(object_store
144            .exists("create_region_dir/1_0000000000/manifest/_file_manifest")
145            .await
146            .unwrap());
147
148        // Object exists, should fail
149        let err = FileRegion::create(region_id, request, &object_store)
150            .await
151            .unwrap_err();
152        assert_matches!(err, Error::ManifestExists { .. });
153    }
154
155    #[tokio::test]
156    async fn test_open_region() {
157        let (_dir, object_store) = new_test_object_store("test_open_region");
158
159        let region_dir = "open_region_dir/".to_string();
160        let request = RegionCreateRequest {
161            engine: "file".to_string(),
162            column_metadatas: new_test_column_metadata(),
163            primary_key: vec![1],
164            options: new_test_options(),
165            table_dir: region_dir.clone(),
166            path_type: PathType::Bare,
167            partition_expr_json: Some("".to_string()),
168        };
169        let region_id = RegionId::new(1, 0);
170
171        let _ = FileRegion::create(region_id, request.clone(), &object_store)
172            .await
173            .unwrap();
174
175        let request = RegionOpenRequest {
176            engine: "file".to_string(),
177            table_dir: region_dir,
178            path_type: PathType::Bare,
179            options: HashMap::default(),
180            skip_wal_replay: false,
181            checkpoint: None,
182        };
183
184        let region = FileRegion::open(region_id, request, &object_store)
185            .await
186            .unwrap();
187
188        assert_eq!(region.region_dir, "open_region_dir/1_0000000000/");
189        assert_eq!(region.url, "test");
190        assert_eq!(region.file_options.files, vec!["1.csv"]);
191        assert_matches!(region.format, Format::Csv { .. });
192        assert_eq!(region.options, new_test_options());
193        assert_eq!(region.metadata.region_id, region_id);
194        assert_eq!(region.metadata.primary_key, vec![1]);
195    }
196
197    #[tokio::test]
198    async fn test_drop_region() {
199        let (_dir, object_store) = new_test_object_store("test_drop_region");
200
201        let region_dir = "drop_region_dir/".to_string();
202        let request = RegionCreateRequest {
203            engine: "file".to_string(),
204            column_metadatas: new_test_column_metadata(),
205            primary_key: vec![1],
206            options: new_test_options(),
207            table_dir: region_dir.clone(),
208            path_type: PathType::Bare,
209            partition_expr_json: Some("".to_string()),
210        };
211        let region_id = RegionId::new(1, 0);
212
213        let region = FileRegion::create(region_id, request.clone(), &object_store)
214            .await
215            .unwrap();
216
217        assert!(object_store
218            .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
219            .await
220            .unwrap());
221
222        FileRegion::drop(&region, &object_store).await.unwrap();
223        assert!(!object_store
224            .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
225            .await
226            .unwrap());
227
228        let request = RegionOpenRequest {
229            engine: "file".to_string(),
230            table_dir: region_dir,
231            path_type: PathType::Bare,
232            options: HashMap::default(),
233            skip_wal_replay: false,
234            checkpoint: None,
235        };
236        let err = FileRegion::open(region_id, request, &object_store)
237            .await
238            .unwrap_err();
239        assert_matches!(err, Error::LoadRegionManifest { .. });
240    }
241}