1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_datasource::file_format::Format;
19use object_store::ObjectStore;
20use store_api::metadata::RegionMetadataRef;
21use store_api::region_request::{RegionCreateRequest, RegionOpenRequest};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::manifest::FileRegionManifest;
26use crate::FileOptions;
27
28#[derive(Debug)]
29pub struct FileRegion {
30 pub(crate) region_dir: String,
31 pub(crate) file_options: FileOptions,
32 pub(crate) url: String,
33 pub(crate) format: Format,
34 pub(crate) options: HashMap<String, String>,
35 pub(crate) metadata: RegionMetadataRef,
36}
37
38pub type FileRegionRef = Arc<FileRegion>;
39
40impl FileRegion {
41 pub async fn create(
42 region_id: RegionId,
43 request: RegionCreateRequest,
44 object_store: &ObjectStore,
45 ) -> Result<FileRegionRef> {
46 let manifest = FileRegionManifest {
47 region_id,
48 column_metadatas: request.column_metadatas.clone(),
49 primary_key: request.primary_key.clone(),
50 options: request.options,
51 };
52
53 let region_dir = request.region_dir;
54 let url = manifest.url()?;
55 let file_options = manifest.file_options()?;
56 let format = manifest.format()?;
57 let options = manifest.options.clone();
58 let metadata = manifest.metadata()?;
59
60 manifest.store(®ion_dir, object_store).await?;
61
62 Ok(Arc::new(Self {
63 region_dir,
64 url,
65 file_options,
66 format,
67 options,
68 metadata,
69 }))
70 }
71
72 pub async fn open(
73 region_id: RegionId,
74 request: RegionOpenRequest,
75 object_store: &ObjectStore,
76 ) -> Result<FileRegionRef> {
77 let manifest =
78 FileRegionManifest::load(region_id, &request.region_dir, object_store).await?;
79
80 Ok(Arc::new(Self {
81 region_dir: request.region_dir,
82 url: manifest.url()?,
83 file_options: manifest.file_options()?,
84 format: manifest.format()?,
85 metadata: manifest.metadata()?,
86 options: manifest.options,
87 }))
88 }
89
90 pub async fn drop(&self, object_store: &ObjectStore) -> Result<()> {
91 FileRegionManifest::delete(self.metadata.region_id, &self.region_dir, object_store).await
92 }
93
94 pub fn metadata(&self) -> RegionMetadataRef {
95 self.metadata.clone()
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use std::assert_matches::assert_matches;
102
103 use super::*;
104 use crate::error::Error;
105 use crate::test_util::{new_test_column_metadata, new_test_object_store, new_test_options};
106
107 #[tokio::test]
108 async fn test_create_region() {
109 let (_dir, object_store) = new_test_object_store("test_create_region");
110
111 let request = RegionCreateRequest {
112 engine: "file".to_string(),
113 column_metadatas: new_test_column_metadata(),
114 primary_key: vec![1],
115 options: new_test_options(),
116 region_dir: "create_region_dir/".to_string(),
117 };
118 let region_id = RegionId::new(1, 0);
119
120 let region = FileRegion::create(region_id, request.clone(), &object_store)
121 .await
122 .unwrap();
123
124 assert_eq!(region.region_dir, "create_region_dir/");
125 assert_eq!(region.url, "test");
126 assert_eq!(region.file_options.files, vec!["1.csv"]);
127 assert_matches!(region.format, Format::Csv { .. });
128 assert_eq!(region.options, new_test_options());
129 assert_eq!(region.metadata.region_id, region_id);
130 assert_eq!(region.metadata.primary_key, vec![1]);
131
132 assert!(object_store
133 .exists("create_region_dir/manifest/_file_manifest")
134 .await
135 .unwrap());
136
137 let err = FileRegion::create(region_id, request, &object_store)
139 .await
140 .unwrap_err();
141 assert_matches!(err, Error::ManifestExists { .. });
142 }
143
144 #[tokio::test]
145 async fn test_open_region() {
146 let (_dir, object_store) = new_test_object_store("test_open_region");
147
148 let region_dir = "open_region_dir/".to_string();
149 let request = RegionCreateRequest {
150 engine: "file".to_string(),
151 column_metadatas: new_test_column_metadata(),
152 primary_key: vec![1],
153 options: new_test_options(),
154 region_dir: region_dir.clone(),
155 };
156 let region_id = RegionId::new(1, 0);
157
158 let _ = FileRegion::create(region_id, request.clone(), &object_store)
159 .await
160 .unwrap();
161
162 let request = RegionOpenRequest {
163 engine: "file".to_string(),
164 region_dir,
165 options: HashMap::default(),
166 skip_wal_replay: false,
167 };
168
169 let region = FileRegion::open(region_id, request, &object_store)
170 .await
171 .unwrap();
172
173 assert_eq!(region.region_dir, "open_region_dir/");
174 assert_eq!(region.url, "test");
175 assert_eq!(region.file_options.files, vec!["1.csv"]);
176 assert_matches!(region.format, Format::Csv { .. });
177 assert_eq!(region.options, new_test_options());
178 assert_eq!(region.metadata.region_id, region_id);
179 assert_eq!(region.metadata.primary_key, vec![1]);
180 }
181
182 #[tokio::test]
183 async fn test_drop_region() {
184 let (_dir, object_store) = new_test_object_store("test_drop_region");
185
186 let region_dir = "drop_region_dir/".to_string();
187 let request = RegionCreateRequest {
188 engine: "file".to_string(),
189 column_metadatas: new_test_column_metadata(),
190 primary_key: vec![1],
191 options: new_test_options(),
192 region_dir: region_dir.clone(),
193 };
194 let region_id = RegionId::new(1, 0);
195
196 let region = FileRegion::create(region_id, request.clone(), &object_store)
197 .await
198 .unwrap();
199
200 assert!(object_store
201 .exists("drop_region_dir/manifest/_file_manifest")
202 .await
203 .unwrap());
204
205 FileRegion::drop(®ion, &object_store).await.unwrap();
206 assert!(!object_store
207 .exists("drop_region_dir/manifest/_file_manifest")
208 .await
209 .unwrap());
210
211 let request = RegionOpenRequest {
212 engine: "file".to_string(),
213 region_dir,
214 options: HashMap::default(),
215 skip_wal_replay: false,
216 };
217 let err = FileRegion::open(region_id, request, &object_store)
218 .await
219 .unwrap_err();
220 assert_matches!(err, Error::LoadRegionManifest { .. });
221 }
222}