1use std::path::PathBuf;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_telemetry::{debug, warn};
20use futures::{AsyncRead, AsyncWrite};
21use index::error as index_error;
22use index::error::Result as IndexResult;
23use index::external_provider::ExternalTempFileProvider;
24use object_store::util::{self, normalize_dir};
25use snafu::ResultExt;
26use store_api::storage::{ColumnId, RegionId};
27use uuid::Uuid;
28
29use crate::access_layer::new_fs_cache_store;
30use crate::error::Result;
31use crate::metrics::{
32 INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
33 INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
34 INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
35};
36use crate::sst::file::FileId;
37use crate::sst::index::store::InstrumentedStore;
38
39const INTERMEDIATE_DIR: &str = "__intm";
40
41#[derive(Clone)]
43pub struct IntermediateManager {
44 base_dir: PathBuf,
45 store: InstrumentedStore,
46}
47
48impl IntermediateManager {
49 pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
52 common_telemetry::info!(
53 "Initializing intermediate manager, aux_path: {}",
54 aux_path.as_ref()
55 );
56
57 let aux_pb = PathBuf::from(aux_path.as_ref());
59 let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
60 let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
61 if let Err(err) = tokio::fs::rename(&intm_dir, &deleted_dir).await {
62 warn!(err; "Failed to rename intermediate directory");
63 }
64 tokio::spawn(async move {
65 if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
66 warn!(err; "Failed to remove intermediate directory");
67 }
68 });
69
70 let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
71 let store = InstrumentedStore::new(store);
72
73 Ok(Self {
74 base_dir: PathBuf::from(aux_path.as_ref()),
75 store,
76 })
77 }
78
79 pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
81 self.store = self.store.with_write_buffer_size(write_buffer_size);
82 self
83 }
84
85 pub(crate) fn store(&self) -> &InstrumentedStore {
87 &self.store
88 }
89
90 pub(crate) fn fulltext_path(
93 &self,
94 region_id: &RegionId,
95 sst_file_id: &FileId,
96 column_id: ColumnId,
97 ) -> PathBuf {
98 let uuid = Uuid::new_v4();
99 self.base_dir
100 .join(INTERMEDIATE_DIR)
101 .join(region_id.as_u64().to_string())
102 .join(sst_file_id.to_string())
103 .join(format!("fulltext-{column_id}-{uuid}"))
104 }
105
106 pub(crate) async fn prune_sst_dir(
108 &self,
109 region_id: &RegionId,
110 sst_file_id: &FileId,
111 ) -> Result<()> {
112 let region_id = region_id.as_u64();
113 let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
114 self.store.remove_all(&sst_dir).await
115 }
116
117 pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
119 let region_id = region_id.as_u64();
120 let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
121 self.store.remove_all(®ion_dir).await
122 }
123}
124
125#[derive(Debug, Clone)]
128pub struct IntermediateLocation {
129 files_dir: String,
130}
131
132impl IntermediateLocation {
133 pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
138 let region_id = region_id.as_u64();
139 let uuid = Uuid::new_v4();
140 Self {
141 files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
142 }
143 }
144
145 pub fn dir_to_cleanup(&self) -> &str {
147 &self.files_dir
148 }
149
150 pub fn file_group_path(&self, file_group: &str) -> String {
153 util::join_path(&self.files_dir, &format!("{file_group}/"))
154 }
155
156 pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
159 util::join_path(
160 &self.file_group_path(file_group),
161 &format!("{im_file_id}.im"),
162 )
163 }
164
165 pub fn im_file_id_from_path(&self, path: &str) -> String {
167 path.rsplit('/')
168 .next()
169 .and_then(|s| s.strip_suffix(".im"))
170 .unwrap_or_default()
171 .to_string()
172 }
173}
174
175pub(crate) struct TempFileProvider {
178 location: IntermediateLocation,
180 manager: IntermediateManager,
182}
183
184#[async_trait]
185impl ExternalTempFileProvider for TempFileProvider {
186 async fn create(
187 &self,
188 file_group: &str,
189 file_id: &str,
190 ) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
191 let path = self.location.file_path(file_group, file_id);
192 let writer = self
193 .manager
194 .store()
195 .writer(
196 &path,
197 &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
198 &INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
199 &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
200 )
201 .await
202 .map_err(BoxedError::new)
203 .context(index_error::ExternalSnafu)?;
204 Ok(Box::new(writer))
205 }
206
207 async fn read_all(
208 &self,
209 file_group: &str,
210 ) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
211 let file_group_path = self.location.file_group_path(file_group);
212 let entries = self
213 .manager
214 .store()
215 .list(&file_group_path)
216 .await
217 .map_err(BoxedError::new)
218 .context(index_error::ExternalSnafu)?;
219 let mut readers = Vec::with_capacity(entries.len());
220
221 for entry in entries {
222 if entry.metadata().is_dir() {
223 debug!("Unexpected entry in index creation dir: {:?}", entry.path());
225 continue;
226 }
227
228 let im_file_id = self.location.im_file_id_from_path(entry.path());
229
230 let reader = self
231 .manager
232 .store()
233 .reader(
234 entry.path(),
235 &INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
236 &INDEX_INTERMEDIATE_READ_OP_TOTAL,
237 &INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
238 )
239 .await
240 .map_err(BoxedError::new)
241 .context(index_error::ExternalSnafu)?;
242 readers.push((im_file_id, Box::new(reader) as _));
243 }
244
245 Ok(readers)
246 }
247}
248
249impl TempFileProvider {
250 pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
252 Self { location, manager }
253 }
254
255 pub async fn cleanup(&self) -> Result<()> {
257 self.manager
258 .store()
259 .remove_all(self.location.dir_to_cleanup())
260 .await
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use std::ffi::OsStr;
267
268 use common_test_util::temp_dir;
269 use futures::{AsyncReadExt, AsyncWriteExt};
270 use regex::Regex;
271 use store_api::storage::RegionId;
272
273 use super::*;
274 use crate::sst::file::FileId;
275
276 #[tokio::test]
277 async fn test_manager() {
278 let temp_dir = temp_dir::create_temp_dir("index_intermediate");
279 let path = temp_dir.path().to_str().unwrap();
280
281 tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
283 .await
284 .unwrap();
285 tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
286 .await
287 .unwrap();
288
289 let _manager = IntermediateManager::init_fs(path).await.unwrap();
290
291 assert!(!tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
293 .await
294 .unwrap());
295 }
296
297 #[tokio::test]
298 async fn test_cleanup_dir() {
299 let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
300
301 let region_id = RegionId::new(0, 0);
302 let sst_file_id = FileId::random();
303 let region_dir = temp_dir
304 .path()
305 .join(INTERMEDIATE_DIR)
306 .join(region_id.as_u64().to_string());
307 let sst_dir = region_dir.join(sst_file_id.to_string());
308
309 let path = temp_dir.path().to_str().unwrap();
310 let manager = IntermediateManager::init_fs(path).await.unwrap();
311
312 let location = IntermediateLocation::new(®ion_id, &sst_file_id);
313 let temp_file_provider = TempFileProvider::new(location, manager.clone());
314
315 let mut f1 = temp_file_provider
316 .create("sky", "000000000000")
317 .await
318 .unwrap();
319 f1.write_all(b"hello").await.unwrap();
320 f1.flush().await.unwrap();
321 f1.close().await.unwrap();
322
323 let mut f2 = temp_file_provider
324 .create("sky", "000000000001")
325 .await
326 .unwrap();
327 f2.write_all(b"world").await.unwrap();
328 f2.flush().await.unwrap();
329 f2.close().await.unwrap();
330
331 temp_file_provider.cleanup().await.unwrap();
332
333 assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
335 assert!(tokio::fs::try_exists(®ion_dir).await.unwrap());
336
337 manager
339 .prune_sst_dir(®ion_id, &sst_file_id)
340 .await
341 .unwrap();
342 assert!(tokio::fs::try_exists(®ion_dir).await.unwrap());
343 assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
344
345 manager.prune_region_dir(®ion_id).await.unwrap();
347 assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
348 assert!(!tokio::fs::try_exists(®ion_dir).await.unwrap());
349 }
350
351 #[test]
352 fn test_intermediate_location() {
353 let sst_file_id = FileId::random();
354 let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
355
356 let re = Regex::new(&format!(
357 "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
358 r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
359 ))
360 .unwrap();
361 assert!(re.is_match(&location.files_dir));
362
363 let uuid = location.files_dir.split('/').nth(3).unwrap();
364
365 let file_group = "1";
366 assert_eq!(
367 location.file_group_path(file_group),
368 format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
369 );
370
371 let im_file_id = "000000000010";
372 let file_path = location.file_path(file_group, im_file_id);
373 assert_eq!(
374 file_path,
375 format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
376 );
377
378 assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
379 }
380
381 #[tokio::test]
382 async fn test_fulltext_intm_path() {
383 let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
384 let aux_path = temp_dir.path().to_string_lossy().to_string();
385
386 let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
387 let region_id = RegionId::new(0, 0);
388 let sst_file_id = FileId::random();
389 let column_id = 1;
390 let fulltext_path = manager.fulltext_path(®ion_id, &sst_file_id, column_id);
391
392 let mut pi = fulltext_path.iter();
393 for a in temp_dir.path().iter() {
394 assert_eq!(a, pi.next().unwrap());
395 }
396 assert_eq!(pi.next().unwrap(), INTERMEDIATE_DIR);
397 assert_eq!(pi.next().unwrap(), "0"); assert_eq!(pi.next().unwrap(), OsStr::new(&sst_file_id.to_string())); assert!(Regex::new(r"fulltext-1-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}")
400 .unwrap()
401 .is_match(&pi.next().unwrap().to_string_lossy())); assert!(pi.next().is_none());
403 }
404
405 #[tokio::test]
406 async fn test_temp_file_provider_basic() {
407 let temp_dir = temp_dir::create_temp_dir("intermediate");
408 let path = temp_dir.path().display().to_string();
409
410 let region_id = RegionId::new(0, 0);
411 let location = IntermediateLocation::new(®ion_id, &FileId::random());
412 let store = IntermediateManager::init_fs(path).await.unwrap();
413 let provider = TempFileProvider::new(location.clone(), store);
414
415 let file_group = "tag0";
416 let file_id = "0000000010";
417 let mut writer = provider.create(file_group, file_id).await.unwrap();
418 writer.write_all(b"hello").await.unwrap();
419 writer.flush().await.unwrap();
420 writer.close().await.unwrap();
421
422 let file_id = "0000000100";
423 let mut writer = provider.create(file_group, file_id).await.unwrap();
424 writer.write_all(b"world").await.unwrap();
425 writer.flush().await.unwrap();
426 writer.close().await.unwrap();
427
428 let file_group = "tag1";
429 let file_id = "0000000010";
430 let mut writer = provider.create(file_group, file_id).await.unwrap();
431 writer.write_all(b"foo").await.unwrap();
432 writer.flush().await.unwrap();
433 writer.close().await.unwrap();
434
435 let readers = provider.read_all("tag0").await.unwrap();
436 assert_eq!(readers.len(), 2);
437 for (_, mut reader) in readers {
438 let mut buf = Vec::new();
439 reader.read_to_end(&mut buf).await.unwrap();
440 assert!(matches!(buf.as_slice(), b"hello" | b"world"));
441 }
442 let readers = provider.read_all("tag1").await.unwrap();
443 assert_eq!(readers.len(), 1);
444 let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
445 let mut buf = Vec::new();
446 reader.read_to_end(&mut buf).await.unwrap();
447 assert_eq!(buf, b"foo");
448
449 provider.cleanup().await.unwrap();
450
451 assert!(provider
452 .manager
453 .store()
454 .list(location.dir_to_cleanup())
455 .await
456 .unwrap()
457 .is_empty());
458 }
459}