mito2/sst/index/
intermediate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_telemetry::{debug, warn};
20use futures::{AsyncRead, AsyncWrite};
21use index::error as index_error;
22use index::error::Result as IndexResult;
23use index::external_provider::ExternalTempFileProvider;
24use object_store::util::{self, normalize_dir};
25use snafu::ResultExt;
26use store_api::storage::{ColumnId, RegionId};
27use uuid::Uuid;
28
29use crate::access_layer::new_fs_cache_store;
30use crate::error::Result;
31use crate::metrics::{
32    INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
33    INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
34    INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
35};
36use crate::sst::file::FileId;
37use crate::sst::index::store::InstrumentedStore;
38
39const INTERMEDIATE_DIR: &str = "__intm";
40
41/// `IntermediateManager` provides store to access to intermediate files.
42#[derive(Clone)]
43pub struct IntermediateManager {
44    base_dir: PathBuf,
45    store: InstrumentedStore,
46}
47
48impl IntermediateManager {
49    /// Create a new `IntermediateManager` with the given root path.
50    /// It will clean up all garbage intermediate files from previous runs.
51    pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
52        common_telemetry::info!(
53            "Initializing intermediate manager, aux_path: {}",
54            aux_path.as_ref()
55        );
56
57        let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
58        let store = InstrumentedStore::new(store);
59
60        // Remove all garbage intermediate files from previous runs.
61        if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await {
62            warn!(err; "Failed to remove garbage intermediate files");
63        }
64
65        Ok(Self {
66            base_dir: PathBuf::from(aux_path.as_ref()),
67            store,
68        })
69    }
70
71    /// Set the write buffer size for the store.
72    pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
73        self.store = self.store.with_write_buffer_size(write_buffer_size);
74        self
75    }
76
77    /// Returns the store to access to intermediate files.
78    pub(crate) fn store(&self) -> &InstrumentedStore {
79        &self.store
80    }
81
82    /// Returns the intermediate directory path for building fulltext index.
83    /// The format is `{aux_path}/__intm/{region_id}/{sst_file_id}/fulltext-{column_id}-{uuid}`.
84    pub(crate) fn fulltext_path(
85        &self,
86        region_id: &RegionId,
87        sst_file_id: &FileId,
88        column_id: ColumnId,
89    ) -> PathBuf {
90        let uuid = Uuid::new_v4();
91        self.base_dir
92            .join(INTERMEDIATE_DIR)
93            .join(region_id.as_u64().to_string())
94            .join(sst_file_id.to_string())
95            .join(format!("fulltext-{column_id}-{uuid}"))
96    }
97}
98
99/// `IntermediateLocation` produces paths for intermediate files
100/// during external sorting.
101#[derive(Debug, Clone)]
102pub struct IntermediateLocation {
103    files_dir: String,
104}
105
106impl IntermediateLocation {
107    /// Create a new `IntermediateLocation`. Set the root directory to
108    /// `__intm/{region_id}/{sst_file_id}/{uuid}/`, incorporating
109    /// uuid to differentiate active sorting files from orphaned data due to unexpected
110    /// process termination.
111    pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
112        let region_id = region_id.as_u64();
113        let uuid = Uuid::new_v4();
114        Self {
115            files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
116        }
117    }
118
119    /// Returns the directory to clean up when the sorting is done
120    pub fn dir_to_cleanup(&self) -> &str {
121        &self.files_dir
122    }
123
124    /// Returns the path of the directory for intermediate files associated with the `file_group`:
125    /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/`
126    pub fn file_group_path(&self, file_group: &str) -> String {
127        util::join_path(&self.files_dir, &format!("{file_group}/"))
128    }
129
130    /// Returns the path of the intermediate file with the given `file_group` and `im_file_id`:
131    /// `__intm/{region_id}/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im`
132    pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
133        util::join_path(
134            &self.file_group_path(file_group),
135            &format!("{im_file_id}.im"),
136        )
137    }
138
139    /// Returns the intermediate file id from the path.
140    pub fn im_file_id_from_path(&self, path: &str) -> String {
141        path.rsplit('/')
142            .next()
143            .and_then(|s| s.strip_suffix(".im"))
144            .unwrap_or_default()
145            .to_string()
146    }
147}
148
149/// `TempFileProvider` implements `ExternalTempFileProvider`.
150/// It uses `InstrumentedStore` to create and read intermediate files.
151pub(crate) struct TempFileProvider {
152    /// Provides the location of intermediate files.
153    location: IntermediateLocation,
154    /// Provides store to access to intermediate files.
155    manager: IntermediateManager,
156}
157
158#[async_trait]
159impl ExternalTempFileProvider for TempFileProvider {
160    async fn create(
161        &self,
162        file_group: &str,
163        file_id: &str,
164    ) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
165        let path = self.location.file_path(file_group, file_id);
166        let writer = self
167            .manager
168            .store()
169            .writer(
170                &path,
171                &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
172                &INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
173                &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
174            )
175            .await
176            .map_err(BoxedError::new)
177            .context(index_error::ExternalSnafu)?;
178        Ok(Box::new(writer))
179    }
180
181    async fn read_all(
182        &self,
183        file_group: &str,
184    ) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
185        let file_group_path = self.location.file_group_path(file_group);
186        let entries = self
187            .manager
188            .store()
189            .list(&file_group_path)
190            .await
191            .map_err(BoxedError::new)
192            .context(index_error::ExternalSnafu)?;
193        let mut readers = Vec::with_capacity(entries.len());
194
195        for entry in entries {
196            if entry.metadata().is_dir() {
197                // todo(hl): we can keep this warning once we find a way to filter self in list result.
198                debug!("Unexpected entry in index creation dir: {:?}", entry.path());
199                continue;
200            }
201
202            let im_file_id = self.location.im_file_id_from_path(entry.path());
203
204            let reader = self
205                .manager
206                .store()
207                .reader(
208                    entry.path(),
209                    &INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
210                    &INDEX_INTERMEDIATE_READ_OP_TOTAL,
211                    &INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
212                )
213                .await
214                .map_err(BoxedError::new)
215                .context(index_error::ExternalSnafu)?;
216            readers.push((im_file_id, Box::new(reader) as _));
217        }
218
219        Ok(readers)
220    }
221}
222
223impl TempFileProvider {
224    /// Creates a new `TempFileProvider`.
225    pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
226        Self { location, manager }
227    }
228
229    /// Removes all intermediate files.
230    pub async fn cleanup(&self) -> Result<()> {
231        self.manager
232            .store()
233            .remove_all(self.location.dir_to_cleanup())
234            .await
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use std::ffi::OsStr;
241
242    use common_test_util::temp_dir;
243    use futures::{AsyncReadExt, AsyncWriteExt};
244    use regex::Regex;
245    use store_api::storage::RegionId;
246
247    use super::*;
248    use crate::sst::file::FileId;
249
250    #[tokio::test]
251    async fn test_manager() {
252        let temp_dir = temp_dir::create_temp_dir("index_intermediate");
253        let path = temp_dir.path().to_str().unwrap();
254
255        // write a garbage file
256        tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
257            .await
258            .unwrap();
259        tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
260            .await
261            .unwrap();
262
263        let _manager = IntermediateManager::init_fs(path).await.unwrap();
264
265        // cleaned up by `init_fs`
266        assert!(!tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
267            .await
268            .unwrap());
269    }
270
271    #[test]
272    fn test_intermediate_location() {
273        let sst_file_id = FileId::random();
274        let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
275
276        let re = Regex::new(&format!(
277            "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
278            r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
279        ))
280        .unwrap();
281        assert!(re.is_match(&location.files_dir));
282
283        let uuid = location.files_dir.split('/').nth(3).unwrap();
284
285        let file_group = "1";
286        assert_eq!(
287            location.file_group_path(file_group),
288            format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
289        );
290
291        let im_file_id = "000000000010";
292        let file_path = location.file_path(file_group, im_file_id);
293        assert_eq!(
294            file_path,
295            format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
296        );
297
298        assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
299    }
300
301    #[tokio::test]
302    async fn test_fulltext_intm_path() {
303        let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
304        let aux_path = temp_dir.path().to_string_lossy().to_string();
305
306        let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
307        let region_id = RegionId::new(0, 0);
308        let sst_file_id = FileId::random();
309        let column_id = 1;
310        let fulltext_path = manager.fulltext_path(&region_id, &sst_file_id, column_id);
311
312        let mut pi = fulltext_path.iter();
313        for a in temp_dir.path().iter() {
314            assert_eq!(a, pi.next().unwrap());
315        }
316        assert_eq!(pi.next().unwrap(), INTERMEDIATE_DIR);
317        assert_eq!(pi.next().unwrap(), "0"); // region id
318        assert_eq!(pi.next().unwrap(), OsStr::new(&sst_file_id.to_string())); // sst file id
319        assert!(Regex::new(r"fulltext-1-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}")
320            .unwrap()
321            .is_match(&pi.next().unwrap().to_string_lossy())); // fulltext path
322        assert!(pi.next().is_none());
323    }
324
325    #[tokio::test]
326    async fn test_temp_file_provider_basic() {
327        let temp_dir = temp_dir::create_temp_dir("intermediate");
328        let path = temp_dir.path().display().to_string();
329
330        let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random());
331        let store = IntermediateManager::init_fs(path).await.unwrap();
332        let provider = TempFileProvider::new(location.clone(), store);
333
334        let file_group = "tag0";
335        let file_id = "0000000010";
336        let mut writer = provider.create(file_group, file_id).await.unwrap();
337        writer.write_all(b"hello").await.unwrap();
338        writer.flush().await.unwrap();
339        writer.close().await.unwrap();
340
341        let file_id = "0000000100";
342        let mut writer = provider.create(file_group, file_id).await.unwrap();
343        writer.write_all(b"world").await.unwrap();
344        writer.flush().await.unwrap();
345        writer.close().await.unwrap();
346
347        let file_group = "tag1";
348        let file_id = "0000000010";
349        let mut writer = provider.create(file_group, file_id).await.unwrap();
350        writer.write_all(b"foo").await.unwrap();
351        writer.flush().await.unwrap();
352        writer.close().await.unwrap();
353
354        let readers = provider.read_all("tag0").await.unwrap();
355        assert_eq!(readers.len(), 2);
356        for (_, mut reader) in readers {
357            let mut buf = Vec::new();
358            reader.read_to_end(&mut buf).await.unwrap();
359            assert!(matches!(buf.as_slice(), b"hello" | b"world"));
360        }
361        let readers = provider.read_all("tag1").await.unwrap();
362        assert_eq!(readers.len(), 1);
363        let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
364        let mut buf = Vec::new();
365        reader.read_to_end(&mut buf).await.unwrap();
366        assert_eq!(buf, b"foo");
367
368        provider.cleanup().await.unwrap();
369
370        assert!(provider
371            .manager
372            .store()
373            .list(location.dir_to_cleanup())
374            .await
375            .unwrap()
376            .is_empty());
377    }
378}