1use std::io::ErrorKind;
16use std::path::PathBuf;
17
18use async_trait::async_trait;
19use common_error::ext::BoxedError;
20use common_telemetry::{debug, warn};
21use futures::{AsyncRead, AsyncWrite};
22use index::error as index_error;
23use index::error::Result as IndexResult;
24use index::external_provider::ExternalTempFileProvider;
25use object_store::util::{self, normalize_dir};
26use snafu::ResultExt;
27use store_api::storage::{ColumnId, FileId, RegionId};
28use uuid::Uuid;
29
30use crate::access_layer::new_fs_cache_store;
31use crate::error::Result;
32use crate::metrics::{
33 INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
34 INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
35 INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
36};
37use crate::sst::index::store::InstrumentedStore;
38
39const INTERMEDIATE_DIR: &str = "__intm";
40
41#[derive(Clone)]
43pub struct IntermediateManager {
44 base_dir: PathBuf,
45 store: InstrumentedStore,
46}
47
48impl IntermediateManager {
49 pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
52 common_telemetry::info!(
53 "Initializing intermediate manager, aux_path: {}",
54 aux_path.as_ref()
55 );
56
57 let aux_pb = PathBuf::from(aux_path.as_ref());
59 let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
60 let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
61 match tokio::fs::rename(&intm_dir, &deleted_dir).await {
62 Ok(_) => {
63 tokio::spawn(async move {
64 if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
65 warn!(err; "Failed to remove intermediate directory");
66 }
67 });
68 }
69 Err(err) => {
70 if err.kind() != ErrorKind::NotFound {
71 warn!(err; "Failed to rename intermediate directory");
72 }
73 }
74 }
75
76 let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
77 let store = InstrumentedStore::new(store);
78
79 Ok(Self {
80 base_dir: PathBuf::from(aux_path.as_ref()),
81 store,
82 })
83 }
84
85 pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
87 self.store = self.store.with_write_buffer_size(write_buffer_size);
88 self
89 }
90
91 pub(crate) fn store(&self) -> &InstrumentedStore {
93 &self.store
94 }
95
96 pub(crate) fn fulltext_path(
99 &self,
100 region_id: &RegionId,
101 sst_file_id: &FileId,
102 column_id: ColumnId,
103 ) -> PathBuf {
104 let uuid = Uuid::new_v4();
105 self.base_dir
106 .join(INTERMEDIATE_DIR)
107 .join(region_id.as_u64().to_string())
108 .join(sst_file_id.to_string())
109 .join(format!("fulltext-{column_id}-{uuid}"))
110 }
111
112 pub(crate) async fn prune_sst_dir(
114 &self,
115 region_id: &RegionId,
116 sst_file_id: &FileId,
117 ) -> Result<()> {
118 let region_id = region_id.as_u64();
119 let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
120 self.store.remove_all(&sst_dir).await
121 }
122
123 pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
125 let region_id = region_id.as_u64();
126 let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
127 self.store.remove_all(®ion_dir).await
128 }
129}
130
131#[derive(Debug, Clone)]
134pub struct IntermediateLocation {
135 files_dir: String,
136}
137
138impl IntermediateLocation {
139 pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
144 let region_id = region_id.as_u64();
145 let uuid = Uuid::new_v4();
146 Self {
147 files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
148 }
149 }
150
151 pub fn dir_to_cleanup(&self) -> &str {
153 &self.files_dir
154 }
155
156 pub fn file_group_path(&self, file_group: &str) -> String {
159 util::join_path(&self.files_dir, &format!("{file_group}/"))
160 }
161
162 pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
165 util::join_path(
166 &self.file_group_path(file_group),
167 &format!("{im_file_id}.im"),
168 )
169 }
170
171 pub fn im_file_id_from_path(&self, path: &str) -> String {
173 path.rsplit('/')
174 .next()
175 .and_then(|s| s.strip_suffix(".im"))
176 .unwrap_or_default()
177 .to_string()
178 }
179}
180
181pub(crate) struct TempFileProvider {
184 location: IntermediateLocation,
186 manager: IntermediateManager,
188}
189
190#[async_trait]
191impl ExternalTempFileProvider for TempFileProvider {
192 async fn create(
193 &self,
194 file_group: &str,
195 file_id: &str,
196 ) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
197 let path = self.location.file_path(file_group, file_id);
198 let writer = self
199 .manager
200 .store()
201 .writer(
202 &path,
203 &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
204 &INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
205 &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
206 )
207 .await
208 .map_err(BoxedError::new)
209 .context(index_error::ExternalSnafu)?;
210 Ok(Box::new(writer))
211 }
212
213 async fn read_all(
214 &self,
215 file_group: &str,
216 ) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
217 let file_group_path = self.location.file_group_path(file_group);
218 let entries = self
219 .manager
220 .store()
221 .list(&file_group_path)
222 .await
223 .map_err(BoxedError::new)
224 .context(index_error::ExternalSnafu)?;
225 let mut readers = Vec::with_capacity(entries.len());
226
227 for entry in entries {
228 if entry.metadata().is_dir() {
229 debug!("Unexpected entry in index creation dir: {:?}", entry.path());
231 continue;
232 }
233
234 let im_file_id = self.location.im_file_id_from_path(entry.path());
235
236 let reader = self
237 .manager
238 .store()
239 .reader(
240 entry.path(),
241 &INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
242 &INDEX_INTERMEDIATE_READ_OP_TOTAL,
243 &INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
244 )
245 .await
246 .map_err(BoxedError::new)
247 .context(index_error::ExternalSnafu)?;
248 readers.push((im_file_id, Box::new(reader) as _));
249 }
250
251 Ok(readers)
252 }
253}
254
255impl TempFileProvider {
256 pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
258 Self { location, manager }
259 }
260
261 pub async fn cleanup(&self) -> Result<()> {
263 self.manager
264 .store()
265 .remove_all(self.location.dir_to_cleanup())
266 .await
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use std::ffi::OsStr;
273
274 use common_test_util::temp_dir;
275 use futures::{AsyncReadExt, AsyncWriteExt};
276 use regex::Regex;
277 use store_api::storage::{FileId, RegionId};
278
279 use super::*;
280
281 #[tokio::test]
282 async fn test_manager() {
283 let temp_dir = temp_dir::create_temp_dir("index_intermediate");
284 let path = temp_dir.path().to_str().unwrap();
285
286 tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
288 .await
289 .unwrap();
290 tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
291 .await
292 .unwrap();
293
294 let _manager = IntermediateManager::init_fs(path).await.unwrap();
295
296 assert!(
298 !tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
299 .await
300 .unwrap()
301 );
302 }
303
304 #[tokio::test]
305 async fn test_cleanup_dir() {
306 let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
307
308 let region_id = RegionId::new(0, 0);
309 let sst_file_id = FileId::random();
310 let region_dir = temp_dir
311 .path()
312 .join(INTERMEDIATE_DIR)
313 .join(region_id.as_u64().to_string());
314 let sst_dir = region_dir.join(sst_file_id.to_string());
315
316 let path = temp_dir.path().to_str().unwrap();
317 let manager = IntermediateManager::init_fs(path).await.unwrap();
318
319 let location = IntermediateLocation::new(®ion_id, &sst_file_id);
320 let temp_file_provider = TempFileProvider::new(location, manager.clone());
321
322 let mut f1 = temp_file_provider
323 .create("sky", "000000000000")
324 .await
325 .unwrap();
326 f1.write_all(b"hello").await.unwrap();
327 f1.flush().await.unwrap();
328 f1.close().await.unwrap();
329
330 let mut f2 = temp_file_provider
331 .create("sky", "000000000001")
332 .await
333 .unwrap();
334 f2.write_all(b"world").await.unwrap();
335 f2.flush().await.unwrap();
336 f2.close().await.unwrap();
337
338 temp_file_provider.cleanup().await.unwrap();
339
340 assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
342 assert!(tokio::fs::try_exists(®ion_dir).await.unwrap());
343
344 manager
346 .prune_sst_dir(®ion_id, &sst_file_id)
347 .await
348 .unwrap();
349 assert!(tokio::fs::try_exists(®ion_dir).await.unwrap());
350 assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
351
352 manager.prune_region_dir(®ion_id).await.unwrap();
354 assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
355 assert!(!tokio::fs::try_exists(®ion_dir).await.unwrap());
356 }
357
358 #[test]
359 fn test_intermediate_location() {
360 let sst_file_id = FileId::random();
361 let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
362
363 let re = Regex::new(&format!(
364 "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
365 r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
366 ))
367 .unwrap();
368 assert!(re.is_match(&location.files_dir));
369
370 let uuid = location.files_dir.split('/').nth(3).unwrap();
371
372 let file_group = "1";
373 assert_eq!(
374 location.file_group_path(file_group),
375 format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
376 );
377
378 let im_file_id = "000000000010";
379 let file_path = location.file_path(file_group, im_file_id);
380 assert_eq!(
381 file_path,
382 format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
383 );
384
385 assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
386 }
387
388 #[tokio::test]
389 async fn test_fulltext_intm_path() {
390 let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
391 let aux_path = temp_dir.path().to_string_lossy().to_string();
392
393 let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
394 let region_id = RegionId::new(0, 0);
395 let sst_file_id = FileId::random();
396 let column_id = 1;
397 let fulltext_path = manager.fulltext_path(®ion_id, &sst_file_id, column_id);
398
399 let mut pi = fulltext_path.iter();
400 for a in temp_dir.path().iter() {
401 assert_eq!(a, pi.next().unwrap());
402 }
403 assert_eq!(pi.next().unwrap(), INTERMEDIATE_DIR);
404 assert_eq!(pi.next().unwrap(), "0"); assert_eq!(pi.next().unwrap(), OsStr::new(&sst_file_id.to_string())); assert!(
407 Regex::new(r"fulltext-1-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}")
408 .unwrap()
409 .is_match(&pi.next().unwrap().to_string_lossy())
410 ); assert!(pi.next().is_none());
412 }
413
414 #[tokio::test]
415 async fn test_temp_file_provider_basic() {
416 let temp_dir = temp_dir::create_temp_dir("intermediate");
417 let path = temp_dir.path().display().to_string();
418
419 let region_id = RegionId::new(0, 0);
420 let location = IntermediateLocation::new(®ion_id, &FileId::random());
421 let store = IntermediateManager::init_fs(path).await.unwrap();
422 let provider = TempFileProvider::new(location.clone(), store);
423
424 let file_group = "tag0";
425 let file_id = "0000000010";
426 let mut writer = provider.create(file_group, file_id).await.unwrap();
427 writer.write_all(b"hello").await.unwrap();
428 writer.flush().await.unwrap();
429 writer.close().await.unwrap();
430
431 let file_id = "0000000100";
432 let mut writer = provider.create(file_group, file_id).await.unwrap();
433 writer.write_all(b"world").await.unwrap();
434 writer.flush().await.unwrap();
435 writer.close().await.unwrap();
436
437 let file_group = "tag1";
438 let file_id = "0000000010";
439 let mut writer = provider.create(file_group, file_id).await.unwrap();
440 writer.write_all(b"foo").await.unwrap();
441 writer.flush().await.unwrap();
442 writer.close().await.unwrap();
443
444 let readers = provider.read_all("tag0").await.unwrap();
445 assert_eq!(readers.len(), 2);
446 for (_, mut reader) in readers {
447 let mut buf = Vec::new();
448 reader.read_to_end(&mut buf).await.unwrap();
449 assert!(matches!(buf.as_slice(), b"hello" | b"world"));
450 }
451 let readers = provider.read_all("tag1").await.unwrap();
452 assert_eq!(readers.len(), 1);
453 let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
454 let mut buf = Vec::new();
455 reader.read_to_end(&mut buf).await.unwrap();
456 assert_eq!(buf, b"foo");
457
458 provider.cleanup().await.unwrap();
459
460 assert!(
461 provider
462 .manager
463 .store()
464 .list(location.dir_to_cleanup())
465 .await
466 .unwrap()
467 .is_empty()
468 );
469 }
470}