1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use store_api::metadata::RegionMetadataRef;
21use store_api::path_utils::region_name;
22use store_api::region_request::{RegionCreateRequest, RegionOpenRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::manifest::FileRegionManifest;
27use crate::FileOptions;
28
29#[derive(Debug)]
30pub struct FileRegion {
31 pub(crate) region_dir: String,
32 pub(crate) file_options: FileOptions,
33 pub(crate) url: String,
34 pub(crate) format: Format,
35 pub(crate) options: HashMap<String, String>,
36 pub(crate) metadata: RegionMetadataRef,
37}
38
39pub type FileRegionRef = Arc<FileRegion>;
40
41impl FileRegion {
42 pub async fn create(
43 region_id: RegionId,
44 request: RegionCreateRequest,
45 object_store: &ObjectStore,
46 ) -> Result<FileRegionRef> {
47 let manifest = FileRegionManifest {
48 region_id,
49 column_metadatas: request.column_metadatas.clone(),
50 primary_key: request.primary_key.clone(),
51 options: request.options,
52 };
53
54 let region_dir = object_store::util::join_dir(
55 &request.table_dir,
56 ®ion_name(region_id.table_id(), region_id.region_sequence()),
57 );
58 let url = manifest.url()?;
59 let file_options = manifest.file_options()?;
60 let format = manifest.format()?;
61 let options = manifest.options.clone();
62 let metadata = manifest.metadata()?;
63
64 manifest.store(®ion_dir, object_store).await?;
65
66 Ok(Arc::new(Self {
67 region_dir,
68 url,
69 file_options,
70 format,
71 options,
72 metadata,
73 }))
74 }
75
76 pub async fn open(
77 region_id: RegionId,
78 request: RegionOpenRequest,
79 object_store: &ObjectStore,
80 ) -> Result<FileRegionRef> {
81 let region_dir = object_store::util::join_dir(
82 &request.table_dir,
83 ®ion_name(region_id.table_id(), region_id.region_sequence()),
84 );
85 let manifest = FileRegionManifest::load(region_id, ®ion_dir, object_store).await?;
86
87 Ok(Arc::new(Self {
88 region_dir,
89 url: manifest.url()?,
90 file_options: manifest.file_options()?,
91 format: manifest.format()?,
92 metadata: manifest.metadata()?,
93 options: manifest.options,
94 }))
95 }
96
97 pub async fn drop(&self, object_store: &ObjectStore) -> Result<()> {
98 FileRegionManifest::delete(self.metadata.region_id, &self.region_dir, object_store).await
99 }
100
101 pub fn metadata(&self) -> RegionMetadataRef {
102 self.metadata.clone()
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use std::assert_matches::assert_matches;
109
110 use store_api::region_request::PathType;
111
112 use super::*;
113 use crate::error::Error;
114 use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};
115
116 #[tokio::test]
117 async fn test_create_region() {
118 let (_dir, object_store) = new_test_object_store("test_create_region");
119
120 let request = RegionCreateRequest {
121 engine: "file".to_string(),
122 column_metadatas: new_test_column_metadata(),
123 primary_key: vec![1],
124 options: new_test_options(),
125 table_dir: "create_region_dir/".to_string(),
126 path_type: PathType::Bare,
127 partition_expr_json: Some("".to_string()),
128 };
129 let region_id = RegionId::new(1, 0);
130
131 let region = FileRegion::create(region_id, request.clone(), &object_store)
132 .await
133 .unwrap();
134
135 assert_eq!(region.region_dir, "create_region_dir/1_0000000000/");
136 assert_eq!(region.url, "test");
137 assert_eq!(region.file_options.files, vec!["1.csv"]);
138 assert_matches!(region.format, Format::Csv { .. });
139 assert_eq!(region.options, new_test_options());
140 assert_eq!(region.metadata.region_id, region_id);
141 assert_eq!(region.metadata.primary_key, vec![1]);
142
143 assert!(object_store
144 .exists("create_region_dir/1_0000000000/manifest/_file_manifest")
145 .await
146 .unwrap());
147
148 let err = FileRegion::create(region_id, request, &object_store)
150 .await
151 .unwrap_err();
152 assert_matches!(err, Error::ManifestExists { .. });
153 }
154
155 #[tokio::test]
156 async fn test_open_region() {
157 let (_dir, object_store) = new_test_object_store("test_open_region");
158
159 let region_dir = "open_region_dir/".to_string();
160 let request = RegionCreateRequest {
161 engine: "file".to_string(),
162 column_metadatas: new_test_column_metadata(),
163 primary_key: vec![1],
164 options: new_test_options(),
165 table_dir: region_dir.clone(),
166 path_type: PathType::Bare,
167 partition_expr_json: Some("".to_string()),
168 };
169 let region_id = RegionId::new(1, 0);
170
171 let _ = FileRegion::create(region_id, request.clone(), &object_store)
172 .await
173 .unwrap();
174
175 let request = RegionOpenRequest {
176 engine: "file".to_string(),
177 table_dir: region_dir,
178 path_type: PathType::Bare,
179 options: HashMap::default(),
180 skip_wal_replay: false,
181 checkpoint: None,
182 };
183
184 let region = FileRegion::open(region_id, request, &object_store)
185 .await
186 .unwrap();
187
188 assert_eq!(region.region_dir, "open_region_dir/1_0000000000/");
189 assert_eq!(region.url, "test");
190 assert_eq!(region.file_options.files, vec!["1.csv"]);
191 assert_matches!(region.format, Format::Csv { .. });
192 assert_eq!(region.options, new_test_options());
193 assert_eq!(region.metadata.region_id, region_id);
194 assert_eq!(region.metadata.primary_key, vec![1]);
195 }
196
197 #[tokio::test]
198 async fn test_drop_region() {
199 let (_dir, object_store) = new_test_object_store("test_drop_region");
200
201 let region_dir = "drop_region_dir/".to_string();
202 let request = RegionCreateRequest {
203 engine: "file".to_string(),
204 column_metadatas: new_test_column_metadata(),
205 primary_key: vec![1],
206 options: new_test_options(),
207 table_dir: region_dir.clone(),
208 path_type: PathType::Bare,
209 partition_expr_json: Some("".to_string()),
210 };
211 let region_id = RegionId::new(1, 0);
212
213 let region = FileRegion::create(region_id, request.clone(), &object_store)
214 .await
215 .unwrap();
216
217 assert!(object_store
218 .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
219 .await
220 .unwrap());
221
222 FileRegion::drop(®ion, &object_store).await.unwrap();
223 assert!(!object_store
224 .exists("drop_region_dir/1_0000000000/manifest/_file_manifest")
225 .await
226 .unwrap());
227
228 let request = RegionOpenRequest {
229 engine: "file".to_string(),
230 table_dir: region_dir,
231 path_type: PathType::Bare,
232 options: HashMap::default(),
233 skip_wal_replay: false,
234 checkpoint: None,
235 };
236 let err = FileRegion::open(region_id, request, &object_store)
237 .await
238 .unwrap_err();
239 assert_matches!(err, Error::LoadRegionManifest { .. });
240 }
241}