mito2/sst/index/
intermediate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::ErrorKind;
16use std::path::PathBuf;
17
18use async_trait::async_trait;
19use common_error::ext::BoxedError;
20use common_telemetry::{debug, warn};
21use futures::{AsyncRead, AsyncWrite};
22use index::error as index_error;
23use index::error::Result as IndexResult;
24use index::external_provider::ExternalTempFileProvider;
25use object_store::util::{self, normalize_dir};
26use snafu::ResultExt;
27use store_api::storage::{ColumnId, FileId, RegionId};
28use uuid::Uuid;
29
30use crate::access_layer::new_fs_cache_store;
31use crate::error::Result;
32use crate::metrics::{
33    INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
34    INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
35    INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
36};
37use crate::sst::index::store::InstrumentedStore;
38
39const INTERMEDIATE_DIR: &str = "__intm";
40
41/// `IntermediateManager` provides store to access to intermediate files.
42#[derive(Clone)]
43pub struct IntermediateManager {
44    base_dir: PathBuf,
45    store: InstrumentedStore,
46}
47
48impl IntermediateManager {
49    /// Create a new `IntermediateManager` with the given root path.
50    /// It will clean up all garbage intermediate files from previous runs.
51    pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
52        common_telemetry::info!(
53            "Initializing intermediate manager, aux_path: {}",
54            aux_path.as_ref()
55        );
56
57        // Remove the intermediate directory on bankground
58        let aux_pb = PathBuf::from(aux_path.as_ref());
59        let intm_dir = aux_pb.join(INTERMEDIATE_DIR);
60        let deleted_dir = intm_dir.with_extension(format!("deleted-{}", Uuid::new_v4()));
61        match tokio::fs::rename(&intm_dir, &deleted_dir).await {
62            Ok(_) => {
63                tokio::spawn(async move {
64                    if let Err(err) = tokio::fs::remove_dir_all(deleted_dir).await {
65                        warn!(err; "Failed to remove intermediate directory");
66                    }
67                });
68            }
69            Err(err) => {
70                if err.kind() != ErrorKind::NotFound {
71                    warn!(err; "Failed to rename intermediate directory");
72                }
73            }
74        }
75
76        let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
77        let store = InstrumentedStore::new(store);
78
79        Ok(Self {
80            base_dir: PathBuf::from(aux_path.as_ref()),
81            store,
82        })
83    }
84
85    /// Set the write buffer size for the store.
86    pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
87        self.store = self.store.with_write_buffer_size(write_buffer_size);
88        self
89    }
90
91    /// Returns the store to access to intermediate files.
92    pub(crate) fn store(&self) -> &InstrumentedStore {
93        &self.store
94    }
95
96    /// Returns the intermediate directory path for building fulltext index.
97    /// The format is `{aux_path}/__intm/{region_id}/{sst_file_id}/fulltext-{column_id}-{uuid}`.
98    pub(crate) fn fulltext_path(
99        &self,
100        region_id: &RegionId,
101        sst_file_id: &FileId,
102        column_id: ColumnId,
103    ) -> PathBuf {
104        let uuid = Uuid::new_v4();
105        self.base_dir
106            .join(INTERMEDIATE_DIR)
107            .join(region_id.as_u64().to_string())
108            .join(sst_file_id.to_string())
109            .join(format!("fulltext-{column_id}-{uuid}"))
110    }
111
112    /// Prunes the intermediate directory for SST files.
113    pub(crate) async fn prune_sst_dir(
114        &self,
115        region_id: &RegionId,
116        sst_file_id: &FileId,
117    ) -> Result<()> {
118        let region_id = region_id.as_u64();
119        let sst_dir = format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/");
120        self.store.remove_all(&sst_dir).await
121    }
122
123    /// Prunes the intermediate directory for region files.
124    pub(crate) async fn prune_region_dir(&self, region_id: &RegionId) -> Result<()> {
125        let region_id = region_id.as_u64();
126        let region_dir = format!("{INTERMEDIATE_DIR}/{region_id}/");
127        self.store.remove_all(&region_dir).await
128    }
129}
130
131/// `IntermediateLocation` produces paths for intermediate files
132/// during external sorting.
133#[derive(Debug, Clone)]
134pub struct IntermediateLocation {
135    files_dir: String,
136}
137
138impl IntermediateLocation {
139    /// Create a new `IntermediateLocation`. Set the root directory to
140    /// `__intm/{region_id}/{sst_file_id}/{uuid}/`, incorporating
141    /// uuid to differentiate active sorting files from orphaned data due to unexpected
142    /// process termination.
143    pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
144        let region_id = region_id.as_u64();
145        let uuid = Uuid::new_v4();
146        Self {
147            files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
148        }
149    }
150
151    /// Returns the directory to clean up when the sorting is done
152    pub fn dir_to_cleanup(&self) -> &str {
153        &self.files_dir
154    }
155
156    /// Returns the path of the directory for intermediate files associated with the `file_group`:
157    /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/`
158    pub fn file_group_path(&self, file_group: &str) -> String {
159        util::join_path(&self.files_dir, &format!("{file_group}/"))
160    }
161
162    /// Returns the path of the intermediate file with the given `file_group` and `im_file_id`:
163    /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im`
164    pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
165        util::join_path(
166            &self.file_group_path(file_group),
167            &format!("{im_file_id}.im"),
168        )
169    }
170
171    /// Returns the intermediate file id from the path.
172    pub fn im_file_id_from_path(&self, path: &str) -> String {
173        path.rsplit('/')
174            .next()
175            .and_then(|s| s.strip_suffix(".im"))
176            .unwrap_or_default()
177            .to_string()
178    }
179}
180
181/// `TempFileProvider` implements `ExternalTempFileProvider`.
182/// It uses `InstrumentedStore` to create and read intermediate files.
183pub(crate) struct TempFileProvider {
184    /// Provides the location of intermediate files.
185    location: IntermediateLocation,
186    /// Provides store to access to intermediate files.
187    manager: IntermediateManager,
188}
189
190#[async_trait]
191impl ExternalTempFileProvider for TempFileProvider {
192    async fn create(
193        &self,
194        file_group: &str,
195        file_id: &str,
196    ) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
197        let path = self.location.file_path(file_group, file_id);
198        let writer = self
199            .manager
200            .store()
201            .writer(
202                &path,
203                &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
204                &INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
205                &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
206            )
207            .await
208            .map_err(BoxedError::new)
209            .context(index_error::ExternalSnafu)?;
210        Ok(Box::new(writer))
211    }
212
213    async fn read_all(
214        &self,
215        file_group: &str,
216    ) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
217        let file_group_path = self.location.file_group_path(file_group);
218        let entries = self
219            .manager
220            .store()
221            .list(&file_group_path)
222            .await
223            .map_err(BoxedError::new)
224            .context(index_error::ExternalSnafu)?;
225        let mut readers = Vec::with_capacity(entries.len());
226
227        for entry in entries {
228            if entry.metadata().is_dir() {
229                // todo(hl): we can keep this warning once we find a way to filter self in list result.
230                debug!("Unexpected entry in index creation dir: {:?}", entry.path());
231                continue;
232            }
233
234            let im_file_id = self.location.im_file_id_from_path(entry.path());
235
236            let reader = self
237                .manager
238                .store()
239                .reader(
240                    entry.path(),
241                    &INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
242                    &INDEX_INTERMEDIATE_READ_OP_TOTAL,
243                    &INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
244                )
245                .await
246                .map_err(BoxedError::new)
247                .context(index_error::ExternalSnafu)?;
248            readers.push((im_file_id, Box::new(reader) as _));
249        }
250
251        Ok(readers)
252    }
253}
254
255impl TempFileProvider {
256    /// Creates a new `TempFileProvider`.
257    pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
258        Self { location, manager }
259    }
260
261    /// Removes all intermediate files.
262    pub async fn cleanup(&self) -> Result<()> {
263        self.manager
264            .store()
265            .remove_all(self.location.dir_to_cleanup())
266            .await
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::ffi::OsStr;
273
274    use common_test_util::temp_dir;
275    use futures::{AsyncReadExt, AsyncWriteExt};
276    use regex::Regex;
277    use store_api::storage::{FileId, RegionId};
278
279    use super::*;
280
281    #[tokio::test]
282    async fn test_manager() {
283        let temp_dir = temp_dir::create_temp_dir("index_intermediate");
284        let path = temp_dir.path().to_str().unwrap();
285
286        // write a garbage file
287        tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
288            .await
289            .unwrap();
290        tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
291            .await
292            .unwrap();
293
294        let _manager = IntermediateManager::init_fs(path).await.unwrap();
295
296        // cleaned up by `init_fs`
297        assert!(
298            !tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
299                .await
300                .unwrap()
301        );
302    }
303
304    #[tokio::test]
305    async fn test_cleanup_dir() {
306        let temp_dir = temp_dir::create_temp_dir("test_cleanup_dir_");
307
308        let region_id = RegionId::new(0, 0);
309        let sst_file_id = FileId::random();
310        let region_dir = temp_dir
311            .path()
312            .join(INTERMEDIATE_DIR)
313            .join(region_id.as_u64().to_string());
314        let sst_dir = region_dir.join(sst_file_id.to_string());
315
316        let path = temp_dir.path().to_str().unwrap();
317        let manager = IntermediateManager::init_fs(path).await.unwrap();
318
319        let location = IntermediateLocation::new(&region_id, &sst_file_id);
320        let temp_file_provider = TempFileProvider::new(location, manager.clone());
321
322        let mut f1 = temp_file_provider
323            .create("sky", "000000000000")
324            .await
325            .unwrap();
326        f1.write_all(b"hello").await.unwrap();
327        f1.flush().await.unwrap();
328        f1.close().await.unwrap();
329
330        let mut f2 = temp_file_provider
331            .create("sky", "000000000001")
332            .await
333            .unwrap();
334        f2.write_all(b"world").await.unwrap();
335        f2.flush().await.unwrap();
336        f2.close().await.unwrap();
337
338        temp_file_provider.cleanup().await.unwrap();
339
340        // sst_dir and region_dir still exists
341        assert!(tokio::fs::try_exists(&sst_dir).await.unwrap());
342        assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
343
344        // sst_dir should be deleted, region_dir still exists
345        manager
346            .prune_sst_dir(&region_id, &sst_file_id)
347            .await
348            .unwrap();
349        assert!(tokio::fs::try_exists(&region_dir).await.unwrap());
350        assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
351
352        // sst_dir, region_dir should be deleted
353        manager.prune_region_dir(&region_id).await.unwrap();
354        assert!(!tokio::fs::try_exists(&sst_dir).await.unwrap());
355        assert!(!tokio::fs::try_exists(&region_dir).await.unwrap());
356    }
357
358    #[test]
359    fn test_intermediate_location() {
360        let sst_file_id = FileId::random();
361        let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
362
363        let re = Regex::new(&format!(
364            "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
365            r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
366        ))
367        .unwrap();
368        assert!(re.is_match(&location.files_dir));
369
370        let uuid = location.files_dir.split('/').nth(3).unwrap();
371
372        let file_group = "1";
373        assert_eq!(
374            location.file_group_path(file_group),
375            format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
376        );
377
378        let im_file_id = "000000000010";
379        let file_path = location.file_path(file_group, im_file_id);
380        assert_eq!(
381            file_path,
382            format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
383        );
384
385        assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
386    }
387
388    #[tokio::test]
389    async fn test_fulltext_intm_path() {
390        let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
391        let aux_path = temp_dir.path().to_string_lossy().to_string();
392
393        let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
394        let region_id = RegionId::new(0, 0);
395        let sst_file_id = FileId::random();
396        let column_id = 1;
397        let fulltext_path = manager.fulltext_path(&region_id, &sst_file_id, column_id);
398
399        let mut pi = fulltext_path.iter();
400        for a in temp_dir.path().iter() {
401            assert_eq!(a, pi.next().unwrap());
402        }
403        assert_eq!(pi.next().unwrap(), INTERMEDIATE_DIR);
404        assert_eq!(pi.next().unwrap(), "0"); // region id
405        assert_eq!(pi.next().unwrap(), OsStr::new(&sst_file_id.to_string())); // sst file id
406        assert!(
407            Regex::new(r"fulltext-1-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}")
408                .unwrap()
409                .is_match(&pi.next().unwrap().to_string_lossy())
410        ); // fulltext path
411        assert!(pi.next().is_none());
412    }
413
414    #[tokio::test]
415    async fn test_temp_file_provider_basic() {
416        let temp_dir = temp_dir::create_temp_dir("intermediate");
417        let path = temp_dir.path().display().to_string();
418
419        let region_id = RegionId::new(0, 0);
420        let location = IntermediateLocation::new(&region_id, &FileId::random());
421        let store = IntermediateManager::init_fs(path).await.unwrap();
422        let provider = TempFileProvider::new(location.clone(), store);
423
424        let file_group = "tag0";
425        let file_id = "0000000010";
426        let mut writer = provider.create(file_group, file_id).await.unwrap();
427        writer.write_all(b"hello").await.unwrap();
428        writer.flush().await.unwrap();
429        writer.close().await.unwrap();
430
431        let file_id = "0000000100";
432        let mut writer = provider.create(file_group, file_id).await.unwrap();
433        writer.write_all(b"world").await.unwrap();
434        writer.flush().await.unwrap();
435        writer.close().await.unwrap();
436
437        let file_group = "tag1";
438        let file_id = "0000000010";
439        let mut writer = provider.create(file_group, file_id).await.unwrap();
440        writer.write_all(b"foo").await.unwrap();
441        writer.flush().await.unwrap();
442        writer.close().await.unwrap();
443
444        let readers = provider.read_all("tag0").await.unwrap();
445        assert_eq!(readers.len(), 2);
446        for (_, mut reader) in readers {
447            let mut buf = Vec::new();
448            reader.read_to_end(&mut buf).await.unwrap();
449            assert!(matches!(buf.as_slice(), b"hello" | b"world"));
450        }
451        let readers = provider.read_all("tag1").await.unwrap();
452        assert_eq!(readers.len(), 1);
453        let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
454        let mut buf = Vec::new();
455        reader.read_to_end(&mut buf).await.unwrap();
456        assert_eq!(buf, b"foo");
457
458        provider.cleanup().await.unwrap();
459
460        assert!(
461            provider
462                .manager
463                .store()
464                .list(location.dir_to_cleanup())
465                .await
466                .unwrap()
467                .is_empty()
468        );
469    }
470}