mito2/sst/index/
intermediate.rs1use std::path::PathBuf;
16
17use async_trait::async_trait;
18use common_error::ext::BoxedError;
19use common_telemetry::{debug, warn};
20use futures::{AsyncRead, AsyncWrite};
21use index::error as index_error;
22use index::error::Result as IndexResult;
23use index::external_provider::ExternalTempFileProvider;
24use object_store::util::{self, normalize_dir};
25use snafu::ResultExt;
26use store_api::storage::{ColumnId, RegionId};
27use uuid::Uuid;
28
29use crate::access_layer::new_fs_cache_store;
30use crate::error::Result;
31use crate::metrics::{
32 INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
33 INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
34 INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
35};
36use crate::sst::file::FileId;
37use crate::sst::index::store::InstrumentedStore;
38
39const INTERMEDIATE_DIR: &str = "__intm";
40
41#[derive(Clone)]
43pub struct IntermediateManager {
44 base_dir: PathBuf,
45 store: InstrumentedStore,
46}
47
48impl IntermediateManager {
49 pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
52 common_telemetry::info!(
53 "Initializing intermediate manager, aux_path: {}",
54 aux_path.as_ref()
55 );
56
57 let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
58 let store = InstrumentedStore::new(store);
59
60 if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await {
62 warn!(err; "Failed to remove garbage intermediate files");
63 }
64
65 Ok(Self {
66 base_dir: PathBuf::from(aux_path.as_ref()),
67 store,
68 })
69 }
70
71 pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
73 self.store = self.store.with_write_buffer_size(write_buffer_size);
74 self
75 }
76
77 pub(crate) fn store(&self) -> &InstrumentedStore {
79 &self.store
80 }
81
82 pub(crate) fn fulltext_path(
85 &self,
86 region_id: &RegionId,
87 sst_file_id: &FileId,
88 column_id: ColumnId,
89 ) -> PathBuf {
90 let uuid = Uuid::new_v4();
91 self.base_dir
92 .join(INTERMEDIATE_DIR)
93 .join(region_id.as_u64().to_string())
94 .join(sst_file_id.to_string())
95 .join(format!("fulltext-{column_id}-{uuid}"))
96 }
97}
98
99#[derive(Debug, Clone)]
102pub struct IntermediateLocation {
103 files_dir: String,
104}
105
106impl IntermediateLocation {
107 pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
112 let region_id = region_id.as_u64();
113 let uuid = Uuid::new_v4();
114 Self {
115 files_dir: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
116 }
117 }
118
119 pub fn dir_to_cleanup(&self) -> &str {
121 &self.files_dir
122 }
123
124 pub fn file_group_path(&self, file_group: &str) -> String {
127 util::join_path(&self.files_dir, &format!("{file_group}/"))
128 }
129
130 pub fn file_path(&self, file_group: &str, im_file_id: &str) -> String {
133 util::join_path(
134 &self.file_group_path(file_group),
135 &format!("{im_file_id}.im"),
136 )
137 }
138
139 pub fn im_file_id_from_path(&self, path: &str) -> String {
141 path.rsplit('/')
142 .next()
143 .and_then(|s| s.strip_suffix(".im"))
144 .unwrap_or_default()
145 .to_string()
146 }
147}
148
149pub(crate) struct TempFileProvider {
152 location: IntermediateLocation,
154 manager: IntermediateManager,
156}
157
158#[async_trait]
159impl ExternalTempFileProvider for TempFileProvider {
160 async fn create(
161 &self,
162 file_group: &str,
163 file_id: &str,
164 ) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
165 let path = self.location.file_path(file_group, file_id);
166 let writer = self
167 .manager
168 .store()
169 .writer(
170 &path,
171 &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
172 &INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
173 &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL,
174 )
175 .await
176 .map_err(BoxedError::new)
177 .context(index_error::ExternalSnafu)?;
178 Ok(Box::new(writer))
179 }
180
181 async fn read_all(
182 &self,
183 file_group: &str,
184 ) -> IndexResult<Vec<(String, Box<dyn AsyncRead + Unpin + Send>)>> {
185 let file_group_path = self.location.file_group_path(file_group);
186 let entries = self
187 .manager
188 .store()
189 .list(&file_group_path)
190 .await
191 .map_err(BoxedError::new)
192 .context(index_error::ExternalSnafu)?;
193 let mut readers = Vec::with_capacity(entries.len());
194
195 for entry in entries {
196 if entry.metadata().is_dir() {
197 debug!("Unexpected entry in index creation dir: {:?}", entry.path());
199 continue;
200 }
201
202 let im_file_id = self.location.im_file_id_from_path(entry.path());
203
204 let reader = self
205 .manager
206 .store()
207 .reader(
208 entry.path(),
209 &INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
210 &INDEX_INTERMEDIATE_READ_OP_TOTAL,
211 &INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
212 )
213 .await
214 .map_err(BoxedError::new)
215 .context(index_error::ExternalSnafu)?;
216 readers.push((im_file_id, Box::new(reader) as _));
217 }
218
219 Ok(readers)
220 }
221}
222
223impl TempFileProvider {
224 pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
226 Self { location, manager }
227 }
228
229 pub async fn cleanup(&self) -> Result<()> {
231 self.manager
232 .store()
233 .remove_all(self.location.dir_to_cleanup())
234 .await
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use std::ffi::OsStr;
241
242 use common_test_util::temp_dir;
243 use futures::{AsyncReadExt, AsyncWriteExt};
244 use regex::Regex;
245 use store_api::storage::RegionId;
246
247 use super::*;
248 use crate::sst::file::FileId;
249
250 #[tokio::test]
251 async fn test_manager() {
252 let temp_dir = temp_dir::create_temp_dir("index_intermediate");
253 let path = temp_dir.path().to_str().unwrap();
254
255 tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
257 .await
258 .unwrap();
259 tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
260 .await
261 .unwrap();
262
263 let _manager = IntermediateManager::init_fs(path).await.unwrap();
264
265 assert!(!tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
267 .await
268 .unwrap());
269 }
270
271 #[test]
272 fn test_intermediate_location() {
273 let sst_file_id = FileId::random();
274 let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
275
276 let re = Regex::new(&format!(
277 "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
278 r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
279 ))
280 .unwrap();
281 assert!(re.is_match(&location.files_dir));
282
283 let uuid = location.files_dir.split('/').nth(3).unwrap();
284
285 let file_group = "1";
286 assert_eq!(
287 location.file_group_path(file_group),
288 format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/")
289 );
290
291 let im_file_id = "000000000010";
292 let file_path = location.file_path(file_group, im_file_id);
293 assert_eq!(
294 file_path,
295 format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{file_group}/{im_file_id}.im")
296 );
297
298 assert_eq!(location.im_file_id_from_path(&file_path), im_file_id);
299 }
300
301 #[tokio::test]
302 async fn test_fulltext_intm_path() {
303 let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
304 let aux_path = temp_dir.path().to_string_lossy().to_string();
305
306 let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
307 let region_id = RegionId::new(0, 0);
308 let sst_file_id = FileId::random();
309 let column_id = 1;
310 let fulltext_path = manager.fulltext_path(®ion_id, &sst_file_id, column_id);
311
312 let mut pi = fulltext_path.iter();
313 for a in temp_dir.path().iter() {
314 assert_eq!(a, pi.next().unwrap());
315 }
316 assert_eq!(pi.next().unwrap(), INTERMEDIATE_DIR);
317 assert_eq!(pi.next().unwrap(), "0"); assert_eq!(pi.next().unwrap(), OsStr::new(&sst_file_id.to_string())); assert!(Regex::new(r"fulltext-1-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}")
320 .unwrap()
321 .is_match(&pi.next().unwrap().to_string_lossy())); assert!(pi.next().is_none());
323 }
324
325 #[tokio::test]
326 async fn test_temp_file_provider_basic() {
327 let temp_dir = temp_dir::create_temp_dir("intermediate");
328 let path = temp_dir.path().display().to_string();
329
330 let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random());
331 let store = IntermediateManager::init_fs(path).await.unwrap();
332 let provider = TempFileProvider::new(location.clone(), store);
333
334 let file_group = "tag0";
335 let file_id = "0000000010";
336 let mut writer = provider.create(file_group, file_id).await.unwrap();
337 writer.write_all(b"hello").await.unwrap();
338 writer.flush().await.unwrap();
339 writer.close().await.unwrap();
340
341 let file_id = "0000000100";
342 let mut writer = provider.create(file_group, file_id).await.unwrap();
343 writer.write_all(b"world").await.unwrap();
344 writer.flush().await.unwrap();
345 writer.close().await.unwrap();
346
347 let file_group = "tag1";
348 let file_id = "0000000010";
349 let mut writer = provider.create(file_group, file_id).await.unwrap();
350 writer.write_all(b"foo").await.unwrap();
351 writer.flush().await.unwrap();
352 writer.close().await.unwrap();
353
354 let readers = provider.read_all("tag0").await.unwrap();
355 assert_eq!(readers.len(), 2);
356 for (_, mut reader) in readers {
357 let mut buf = Vec::new();
358 reader.read_to_end(&mut buf).await.unwrap();
359 assert!(matches!(buf.as_slice(), b"hello" | b"world"));
360 }
361 let readers = provider.read_all("tag1").await.unwrap();
362 assert_eq!(readers.len(), 1);
363 let mut reader = readers.into_iter().map(|x| x.1).next().unwrap();
364 let mut buf = Vec::new();
365 reader.read_to_end(&mut buf).await.unwrap();
366 assert_eq!(buf, b"foo");
367
368 provider.cleanup().await.unwrap();
369
370 assert!(provider
371 .manager
372 .store()
373 .list(location.dir_to_cleanup())
374 .await
375 .unwrap()
376 .is_empty());
377 }
378}