1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use store_api::metadata::RegionMetadataRef;
21use store_api::path_utils::region_name;
22use store_api::region_request::{RegionCreateRequest, RegionOpenRequest};
23use store_api::storage::RegionId;
24
25use crate::FileOptions;
26use crate::error::Result;
27use crate::manifest::FileRegionManifest;
28
29#[derive(Debug)]
30pub struct FileRegion {
31 pub(crate) region_dir: String,
32 pub(crate) file_options: FileOptions,
33 pub(crate) url: String,
34 pub(crate) format: Format,
35 pub(crate) options: HashMap<String, String>,
36 pub(crate) metadata: RegionMetadataRef,
37}
38
39pub type FileRegionRef = Arc<FileRegion>;
40
41impl FileRegion {
42 pub async fn create(
43 region_id: RegionId,
44 request: RegionCreateRequest,
45 object_store: &ObjectStore,
46 ) -> Result<FileRegionRef> {
47 let manifest = FileRegionManifest {
48 region_id,
49 column_metadatas: request.column_metadatas.clone(),
50 primary_key: request.primary_key.clone(),
51 options: request.options,
52 };
53
54 let region_dir = object_store::util::join_dir(
55 &request.table_dir,
56 ®ion_name(region_id.table_id(), region_id.region_sequence()),
57 );
58 let url = manifest.url()?;
59 let file_options = manifest.file_options()?;
60 let format = manifest.format()?;
61 let options = manifest.options.clone();
62 let metadata = manifest.metadata()?;
63
64 manifest.store(®ion_dir, object_store).await?;
65
66 Ok(Arc::new(Self {
67 region_dir,
68 url,
69 file_options,
70 format,
71 options,
72 metadata,
73 }))
74 }
75
76 pub async fn open(
77 region_id: RegionId,
78 request: RegionOpenRequest,
79 object_store: &ObjectStore,
80 ) -> Result<FileRegionRef> {
81 let region_dir = object_store::util::join_dir(
82 &request.table_dir,
83 ®ion_name(region_id.table_id(), region_id.region_sequence()),
84 );
85 let manifest = FileRegionManifest::load(region_id, ®ion_dir, object_store).await?;
86
87 Ok(Arc::new(Self {
88 region_dir,
89 url: manifest.url()?,
90 file_options: manifest.file_options()?,
91 format: manifest.format()?,
92 metadata: manifest.metadata()?,
93 options: manifest.options,
94 }))
95 }
96
97 pub async fn drop(&self, object_store: &ObjectStore) -> Result<()> {
98 FileRegionManifest::delete(self.metadata.region_id, &self.region_dir, object_store).await
99 }
100
101 pub fn metadata(&self) -> RegionMetadataRef {
102 self.metadata.clone()
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use std::assert_matches;
109
110 use store_api::region_request::PathType;
111
112 use super::*;
113 use crate::error::Error;
114 use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};
115
116 #[tokio::test]
117 async fn test_create_region() {
118 let (_dir, object_store) = new_test_object_store("test_create_region");
119
120 let request = RegionCreateRequest {
121 engine: "file".to_string(),
122 column_metadatas: new_test_column_metadata(),
123 primary_key: vec![1],
124 options: new_test_options(),
125 table_dir: "create_region_dir/".to_string(),
126 path_type: PathType::Bare,
127 partition_expr_json: Some("".to_string()),
128 requirements: Default::default(),
129 };
130 let region_id = RegionId::new(1, 0);
131
132 let region = FileRegion::create(region_id, request.clone(), &object_store)
133 .await
134 .unwrap();
135
136 assert_eq!(region.region_dir, "create_region_dir/1_0000000000/");
137 assert_eq!(region.url, "test");
138 assert_eq!(region.file_options.files, vec!["1.csv"]);
139 assert_matches!(region.format, Format::Csv { .. });
140 assert_eq!(region.options, new_test_options());
141 assert_eq!(region.metadata.region_id, region_id);
142 assert_eq!(region.metadata.primary_key, vec![1]);
143
144 assert!(
145 object_store
146 .exists("create_region_dir/1_0000000000/manifest/_file_manifest")
147 .await
148 .unwrap()
149 );
150
151 let err = FileRegion::create(region_id, request, &object_store)
153 .await
154 .unwrap_err();
155 assert_matches!(err, Error::ManifestExists { .. });
156 }
157
158 #[tokio::test]
159 async fn test_open_region() {
160 let (_dir, object_store) = new_test_object_store("test_open_region");
161
162 let region_dir = "open_region_dir/".to_string();
163 let request = RegionCreateRequest {
164 engine: "file".to_string(),
165 column_metadatas: new_test_column_metadata(),
166 primary_key: vec![1],
167 options: new_test_options(),
168 table_dir: region_dir.clone(),
169 path_type: PathType::Bare,
170 partition_expr_json: Some("".to_string()),
171 requirements: Default::default(),
172 };
173 let region_id = RegionId::new(1, 0);
174
175 let _ = FileRegion::create(region_id, request.clone(), &object_store)
176 .await
177 .unwrap();
178
179 let request = RegionOpenRequest {
180 engine: "file".to_string(),
181 table_dir: region_dir,
182 path_type: PathType::Bare,
183 options: HashMap::default(),
184 skip_wal_replay: false,
185 checkpoint: None,
186 requirements: Default::default(),
187 };
188
189 let region = FileRegion::open(region_id, request, &object_store)
190 .await
191 .unwrap();
192
193 assert_eq!(region.region_dir, "open_region_dir/1_0000000000/");
194 assert_eq!(region.url, "test");
195 assert_eq!(region.file_options.files, vec!["1.csv"]);
196 assert_matches!(region.format, Format::Csv { .. });
197 assert_eq!(region.options, new_test_options());
198 assert_eq!(region.metadata.region_id, region_id);
199 assert_eq!(region.metadata.primary_key, vec![1]);
200 }
201
202 #[tokio::test]
203 async fn test_drop_region() {
204 let (_dir, object_store) = new_test_object_store("test_drop_region");
205
206 let region_dir = "drop_region_dir/".to_string();
207 let request = RegionCreateRequest {
208 engine: "file".to_string(),
209 column_metadatas: new_test_column_metadata(),
210 primary_key: vec![1],
211 options: new_test_options(),
212 table_dir: region_dir.clone(),
213 path_type: PathType::Bare,
214 partition_expr_json: Some("".to_string()),
215 requirements: Default::default(),
216 };
217 let region_id = RegionId::new(1, 0);
218
219 let region = FileRegion::create(region_id, request.clone(), &object_store)
220 .await
221 .unwrap();
222
223 assert!(
224 object_store
225 .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
226 .await
227 .unwrap()
228 );
229
230 FileRegion::drop(®ion, &object_store).await.unwrap();
231 assert!(
232 !object_store
233 .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
234 .await
235 .unwrap()
236 );
237
238 let request = RegionOpenRequest {
239 engine: "file".to_string(),
240 table_dir: region_dir,
241 path_type: PathType::Bare,
242 options: HashMap::default(),
243 skip_wal_replay: false,
244 checkpoint: None,
245 requirements: Default::default(),
246 };
247 let err = FileRegion::open(region_id, request, &object_store)
248 .await
249 .unwrap_err();
250 assert_matches!(err, Error::LoadRegionManifest { .. });
251 }
252}