mito2/sst/index/
intermediate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_telemetry::{debug, warn};
20use futures::{AsyncRead, AsyncWrite};
21use index::error as index_error;
22use index::error::Result as IndexResult;
23use index::external_provider::ExternalTempFileProvider;
24use object_store::util::{self, normalize_dir};
25use snafu::ResultExt;
26use store_api::storage::{ColumnId, RegionId};
27use uuid::Uuid;
28
29use crate::access_layer::new_fs_cache_store;
30use crate::error::Result;
31use crate::metrics::{
32    INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
33    INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
34    INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
35};
36use crate::sst::file::FileId;
37use crate::sst::index::store::InstrumentedStore;
38
39const INTERMEDIATE_DIR: &str = "__intm";
40
41/// `IntermediateManager` provides store to access to intermediate files.
42#[derive(Clone)]
43pub struct IntermediateManager {
44    base_dir: PathBuf,
45    store: InstrumentedStore,
46}
47
48impl IntermediateManager {
49    /// Create a new `IntermediateManager` with the given root path.
50    /// It will clean up all garbage intermediate files from previous runs.
51    pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
52        common_telemetry::info!(
53            "Initializing intermediate manager, aux_path: {}",
54            aux_path.as_ref()
55        );
56
57        // Remove the intermediate directory on bankground
58        let aux_pb = PathBuf::from(aux_path.as_ref());
59        let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
60        let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
61        if let Err(err) = tokio::fs::rename(&intm_dir, &deleted_dir).await {
62            warn!(err; "Failed to rename intermediate directory");
63        }
64        tokio::spawn(async move {
65            if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
66                warn!(err; "Failed to remove intermediate directory");
67            }
68        });
69
70        let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
71        let store = InstrumentedStore::new(store);
72
73        Ok(Self {
74            base_dir: PathBuf::from(aux_path.as_ref()),
75            store,
76        })
77    }
78
79    /// Set the write buffer size for the store.
80    pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
81        self.store = self.store.with_write_buffer_size(write_buffer_size);
82        self
83    }
84
85    /// Returns the store to access to intermediate files.
86    pub(crate) fn store(&self) -> &InstrumentedStore {
87        &self.store
88    }
89
90    /// Returns the intermediate directory path for building fulltext index.
91    /// The format is `{aux_path}/__intm/{region_id}/{sst_file_id}/fulltext-{column_id}-{uuid}`.
92    pub(crate) fn fulltext_path(
93        &self,
94        region_id: &RegionId,
95        sst_file_id: &FileId,
96        column_id: ColumnId,
97    ) -> PathBuf {
98        let uuid = Uuid::new_v4();
99        self.base_dir
100            .join(INTERMEDIATE_DIR)
101            .join(region_id.as_u64().to_string())
102            .join(sst_file_id.to_string())
103            .join(format!("fulltext-{column_id}-{uuid}"))
104    }
105
106    /// Prunes the intermediate directory for SST files.
107    pub(crate) async fn prune_sst_dir(
108        &self,
109        region_id: &RegionId,
110        sst_file_id: &FileId,
111    ) -> Result<()> {
112        let region_id = region_id.as_u64();
113        let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
114        self.store.remove_all(&sst_dir).await
115    }
116
117    /// Prunes the intermediate directory for region files.
118    pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
119        let region_id = region_id.as_u64();
120        let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
121        self.store.remove_all(&region_dir).await
122    }
123}
124
125/// `IntermediateLocation` produces paths for intermediate files
126/// during external sorting.
127#[derive(Debug, Clone)]
128pub struct IntermediateLocation {
129    files_dir: String,
130}
131
132impl IntermediateLocation {
133    /// Create a new `IntermediateLocation`. Set the root directory to
134    /// `__intm/{region_id}/{sst_file_id}/{uuid}/`, incorporating
135    /// uuid to differentiate active sorting files from orphaned data due to unexpected
136    /// process termination.
137    pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
138        let region_id = region_id.as_u64();
139        let uuid = Uuid::new_v4();
140        Self {
141            files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
142        }
143    }
144
145    /// Returns the directory to clean up when the sorting is done
146    pub fn dir_to_cleanup(&self) -> &str {
147        &self.files_dir
148    }
149
150    /// Returns the path of the directory for intermediate files associated with the `file_group`:
151    /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/`
152    pub fn file_group_path(&self, file_group: &str) -> String {
153        util::join_path(&self.files_dir, &format!("{file_group}/"))
154    }
155
156    /// Returns the path of the intermediate file with the given `file_group` and `im_file_id`:
157    /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im`
158    pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
159        util::join_path(
160            &self.file_group_path(file_group),
161            &format!("{im_file_id}.im"),
162        )
163    }
164
165    /// Returns the intermediate file id from the path.
166    pub fn im_file_id_from_path(&self, path: &str) -> String {
167        path.rsplit('/')
168            .next()
169            .and_then(|s| s.strip_suffix(".im"))
170            .unwrap_or_default()
171            .to_string()
172    }
173}
174
175/// `TempFileProvider` implements `ExternalTempFileProvider`.
176/// It uses `InstrumentedStore` to create and read intermediate files.
177pub(crate) struct TempFileProvider {
178    /// Provides the location of intermediate files.
179    location: IntermediateLocation,
180    /// Provides store to access to intermediate files.
181    manager: IntermediateManager,
182}
183
184#[async_trait]
185impl ExternalTempFileProvider for TempFileProvider {
186    async fn create(
187        &self,
188        file_group: &str,
189        file_id: &str,
190    ) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
191        let path = self.location.file_path(file_group, file_id);
192        let writer = self
193            .manager
194            .store()
195            .writer(
196                &path,
197                &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
198                &INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
199                &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
200            )
201            .await
202            .map_err(BoxedError::new)
203            .context(index_error::ExternalSnafu)?;
204        Ok(Box::new(writer))
205    }
206
207    async fn read_all(
208        &self,
209        file_group: &str,
210    ) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
211        let file_group_path = self.location.file_group_path(file_group);
212        let entries = self
213            .manager
214            .store()
215            .list(&file_group_path)
216            .await
217            .map_err(BoxedError::new)
218            .context(index_error::ExternalSnafu)?;
219        let mut readers = Vec::with_capacity(entries.len());
220
221        for entry in entries {
222            if entry.metadata().is_dir() {
223                // todo(hl): we can keep this warning once we find a way to filter self in list result.
224                debug!("Unexpected entry in index creation dir: {:?}", entry.path());
225                continue;
226            }
227
228            let im_file_id = self.location.im_file_id_from_path(entry.path());
229
230            let reader = self
231                .manager
232                .store()
233                .reader(
234                    entry.path(),
235                    &INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
236                    &INDEX_INTERMEDIATE_READ_OP_TOTAL,
237                    &INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
238                )
239                .await
240                .map_err(BoxedError::new)
241                .context(index_error::ExternalSnafu)?;
242            readers.push((im_file_id, Box::new(reader) as _));
243        }
244
245        Ok(readers)
246    }
247}
248
249impl TempFileProvider {
250    /// Creates a new `TempFileProvider`.
251    pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
252        Self { location, manager }
253    }
254
255    /// Removes all intermediate files.
256    pub async fn cleanup(&self) -> Result<()> {
257        self.manager
258            .store()
259            .remove_all(self.location.dir_to_cleanup())
260            .await
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use std::ffi::OsStr;
267
268    use common_test_util::temp_dir;
269    use futures::{AsyncReadExt, AsyncWriteExt};
270    use regex::Regex;
271    use store_api::storage::RegionId;
272
273    use super::*;
274    use crate::sst::file::FileId;
275
276    #[tokio::test]
277    async fn test_manager() {
278        let temp_dir = temp_dir::create_temp_dir("index_intermediate");
279        let path = temp_dir.path().to_str().unwrap();
280
281        // write a garbage file
282        tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
283            .await
284            .unwrap();
285        tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
286            .await
287            .unwrap();
288
289        let _manager = IntermediateManager::init_fs(path).await.unwrap();
290
291        // cleaned up by `init_fs`
292        assert!(!tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
293            .await
294            .unwrap());
295    }
296
297    #[tokio::test]
298    async fn test_cleanup_dir() {
299        let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
300
301        let region_id = RegionId::new(0, 0);
302        let sst_file_id = FileId::random();
303        let region_dir = temp_dir
304            .path()
305            .join(INTERMEDIATE_DIR)
306            .join(region_id.as_u64().to_string());
307        let sst_dir = region_dir.join(sst_file_id.to_string());
308
309        let path = temp_dir.path().to_str().unwrap();
310        let manager = IntermediateManager::init_fs(path).await.unwrap();
311
312        let location = IntermediateLocation::new(&region_id, &sst_file_id);
313        let temp_file_provider = TempFileProvider::new(location, manager.clone());
314
315        let mut f1 = temp_file_provider
316            .create("sky", "000000000000")
317            .await
318            .unwrap();
319        f1.write_all(b"hello").await.unwrap();
320        f1.flush().await.unwrap();
321        f1.close().await.unwrap();
322
323        let mut f2 = temp_file_provider
324            .create("sky", "000000000001")
325            .await
326            .unwrap();
327        f2.write_all(b"world").await.unwrap();
328        f2.flush().await.unwrap();
329        f2.close().await.unwrap();
330
331        temp_file_provider.cleanup().await.unwrap();
332
333        // sst_dir and region_dir still exists
334        assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
335        assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
336
337        // sst_dir should be deleted, region_dir still exists
338        manager
339            .prune_sst_dir(&region_id, &sst_file_id)
340            .await
341            .unwrap();
342        assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
343        assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
344
345        // sst_dir, region_dir should be deleted
346        manager.prune_region_dir(&region_id).await.unwrap();
347        assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
348        assert!(!tokio::fs::try_exists(&region_dir).await.unwrap());
349    }
350
351    #[test]
352    fn test_intermediate_location() {
353        let sst_file_id = FileId::random();
354        let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
355
356        let re = Regex::new(&format!(
357            "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
358            r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
359        ))
360        .unwrap();
361        assert!(re.is_match(&location.files_dir));
362
363        let uuid = location.files_dir.split('/').nth(3).unwrap();
364
365        let file_group = "1";
366        assert_eq!(
367            location.file_group_path(file_group),
368            format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
369        );
370
371        let im_file_id = "000000000010";
372        let file_path = location.file_path(file_group, im_file_id);
373        assert_eq!(
374            file_path,
375            format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
376        );
377
378        assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
379    }
380
381    #[tokio::test]
382    async fn test_fulltext_intm_path() {
383        let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
384        let aux_path = temp_dir.path().to_string_lossy().to_string();
385
386        let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
387        let region_id = RegionId::new(0, 0);
388        let sst_file_id = FileId::random();
389        let column_id = 1;
390        let fulltext_path = manager.fulltext_path(&region_id, &sst_file_id, column_id);
391
392        let mut pi = fulltext_path.iter();
393        for a in temp_dir.path().iter() {
394            assert_eq!(a, pi.next().unwrap());
395        }
396        assert_eq!(pi.next().unwrap(), INTERMEDIATE_DIR);
397        assert_eq!(pi.next().unwrap(), "0"); // region id
398        assert_eq!(pi.next().unwrap(), OsStr::new(&sst_file_id.to_string())); // sst file id
399        assert!(Regex::new(r"fulltext-1-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}")
400            .unwrap()
401            .is_match(&pi.next().unwrap().to_string_lossy())); // fulltext path
402        assert!(pi.next().is_none());
403    }
404
405    #[tokio::test]
406    async fn test_temp_file_provider_basic() {
407        let temp_dir = temp_dir::create_temp_dir("intermediate");
408        let path = temp_dir.path().display().to_string();
409
410        let region_id = RegionId::new(0, 0);
411        let location = IntermediateLocation::new(&region_id, &FileId::random());
412        let store = IntermediateManager::init_fs(path).await.unwrap();
413        let provider = TempFileProvider::new(location.clone(), store);
414
415        let file_group = "tag0";
416        let file_id = "0000000010";
417        let mut writer = provider.create(file_group, file_id).await.unwrap();
418        writer.write_all(b"hello").await.unwrap();
419        writer.flush().await.unwrap();
420        writer.close().await.unwrap();
421
422        let file_id = "0000000100";
423        let mut writer = provider.create(file_group, file_id).await.unwrap();
424        writer.write_all(b"world").await.unwrap();
425        writer.flush().await.unwrap();
426        writer.close().await.unwrap();
427
428        let file_group = "tag1";
429        let file_id = "0000000010";
430        let mut writer = provider.create(file_group, file_id).await.unwrap();
431        writer.write_all(b"foo").await.unwrap();
432        writer.flush().await.unwrap();
433        writer.close().await.unwrap();
434
435        let readers = provider.read_all("tag0").await.unwrap();
436        assert_eq!(readers.len(), 2);
437        for (_, mut reader) in readers {
438            let mut buf = Vec::new();
439            reader.read_to_end(&mut buf).await.unwrap();
440            assert!(matches!(buf.as_slice(), b"hello" | b"world"));
441        }
442        let readers = provider.read_all("tag1").await.unwrap();
443        assert_eq!(readers.len(), 1);
444        let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
445        let mut buf = Vec::new();
446        reader.read_to_end(&mut buf).await.unwrap();
447        assert_eq!(buf, b"foo");
448
449        provider.cleanup().await.unwrap();
450
451        assert!(provider
452            .manager
453            .store()
454            .list(location.dir_to_cleanup())
455            .await
456            .unwrap()
457            .is_empty());
458    }
459}