1use std::sync::Arc;
18use std::time::Duration;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28 new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29 TempFileCleaner, WriteCachePathProvider,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34 FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
35 WRITE_CACHE_DOWNLOAD_ELAPSED,
36};
37use crate::sst::index::intermediate::IntermediateManager;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39use crate::sst::index::IndexerBuilderImpl;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::WriteOptions;
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44pub struct WriteCache {
48 file_cache: FileCacheRef,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59 pub async fn new(
62 local_store: ObjectStore,
63 cache_capacity: ReadableSize,
64 ttl: Option<Duration>,
65 puffin_manager_factory: PuffinManagerFactory,
66 intermediate_manager: IntermediateManager,
67 ) -> Result<Self> {
68 let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69 file_cache.recover(false).await;
70
71 Ok(Self {
72 file_cache,
73 puffin_manager_factory,
74 intermediate_manager,
75 })
76 }
77
78 pub async fn new_fs(
80 cache_dir: &str,
81 cache_capacity: ReadableSize,
82 ttl: Option<Duration>,
83 puffin_manager_factory: PuffinManagerFactory,
84 intermediate_manager: IntermediateManager,
85 ) -> Result<Self> {
86 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88 let local_store = new_fs_cache_store(cache_dir).await?;
89 Self::new(
90 local_store,
91 cache_capacity,
92 ttl,
93 puffin_manager_factory,
94 intermediate_manager,
95 )
96 .await
97 }
98
99 pub(crate) fn file_cache(&self) -> FileCacheRef {
101 self.file_cache.clone()
102 }
103
104 pub(crate) async fn write_and_upload_sst(
106 &self,
107 write_request: SstWriteRequest,
108 upload_request: SstUploadRequest,
109 write_opts: &WriteOptions,
110 ) -> Result<SstInfoArray> {
111 let timer = FLUSH_ELAPSED
112 .with_label_values(&["write_sst"])
113 .start_timer();
114
115 let region_id = write_request.metadata.region_id;
116
117 let store = self.file_cache.local_store();
118 let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
119 let indexer = IndexerBuilderImpl {
120 op_type: write_request.op_type,
121 metadata: write_request.metadata.clone(),
122 row_group_size: write_opts.row_group_size,
123 puffin_manager: self
124 .puffin_manager_factory
125 .build(store.clone(), path_provider.clone()),
126 intermediate_manager: self.intermediate_manager.clone(),
127 index_options: write_request.index_options,
128 inverted_index_config: write_request.inverted_index_config,
129 fulltext_index_config: write_request.fulltext_index_config,
130 bloom_filter_index_config: write_request.bloom_filter_index_config,
131 };
132
133 let cleaner = TempFileCleaner::new(region_id, store.clone());
134 let mut writer = ParquetWriter::new_with_object_store(
136 store.clone(),
137 write_request.metadata,
138 indexer,
139 path_provider.clone(),
140 )
141 .await
142 .with_file_cleaner(cleaner);
143
144 let sst_info = writer
145 .write_all(write_request.source, write_request.max_sequence, write_opts)
146 .await?;
147
148 timer.stop_and_record();
149
150 if sst_info.is_empty() {
152 return Ok(sst_info);
153 }
154
155 let mut upload_tracker = UploadTracker::new(region_id);
156 let mut err = None;
157 let remote_store = &upload_request.remote_store;
158 for sst in &sst_info {
159 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
160 let parquet_path = upload_request
161 .dest_path_provider
162 .build_sst_file_path(sst.file_id);
163 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
164 err = Some(e);
165 break;
166 }
167 upload_tracker.push_uploaded_file(parquet_path);
168
169 if sst.index_metadata.file_size > 0 {
170 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
171 let puffin_path = upload_request
172 .dest_path_provider
173 .build_index_file_path(sst.file_id);
174 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
175 err = Some(e);
176 break;
177 }
178 upload_tracker.push_uploaded_file(puffin_path);
179 }
180 }
181
182 if let Some(err) = err {
183 upload_tracker
185 .clean(&sst_info, &self.file_cache, remote_store)
186 .await;
187 return Err(err);
188 }
189
190 Ok(sst_info)
191 }
192
193 pub(crate) async fn remove(&self, index_key: IndexKey) {
195 self.file_cache.remove(index_key).await
196 }
197
198 pub(crate) async fn download(
201 &self,
202 index_key: IndexKey,
203 remote_path: &str,
204 remote_store: &ObjectStore,
205 file_size: u64,
206 ) -> Result<()> {
207 if let Err(e) = self
208 .download_without_cleaning(index_key, remote_path, remote_store, file_size)
209 .await
210 {
211 let filename = index_key.to_string();
212 TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
213 .await;
214
215 return Err(e);
216 }
217 Ok(())
218 }
219
220 async fn download_without_cleaning(
221 &self,
222 index_key: IndexKey,
223 remote_path: &str,
224 remote_store: &ObjectStore,
225 file_size: u64,
226 ) -> Result<()> {
227 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
228 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
229
230 let file_type = index_key.file_type;
231 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
232 .with_label_values(&[match file_type {
233 FileType::Parquet => "download_parquet",
234 FileType::Puffin => "download_puffin",
235 }])
236 .start_timer();
237
238 let reader = remote_store
239 .reader_with(remote_path)
240 .concurrent(DOWNLOAD_READER_CONCURRENCY)
241 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
242 .await
243 .context(error::OpenDalSnafu)?
244 .into_futures_async_read(0..file_size)
245 .await
246 .context(error::OpenDalSnafu)?;
247
248 let cache_path = self.file_cache.cache_file_path(index_key);
249 let mut writer = self
250 .file_cache
251 .local_store()
252 .writer(&cache_path)
253 .await
254 .context(error::OpenDalSnafu)?
255 .into_futures_async_write();
256
257 let region_id = index_key.region_id;
258 let file_id = index_key.file_id;
259 let bytes_written =
260 futures::io::copy(reader, &mut writer)
261 .await
262 .context(error::DownloadSnafu {
263 region_id,
264 file_id,
265 file_type,
266 })?;
267 writer.close().await.context(error::DownloadSnafu {
268 region_id,
269 file_id,
270 file_type,
271 })?;
272
273 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
274
275 let elapsed = timer.stop_and_record();
276 debug!(
277 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
278 remote_path, cache_path, bytes_written, region_id, elapsed,
279 );
280
281 let index_value = IndexValue {
282 file_size: bytes_written as _,
283 };
284 self.file_cache.put(index_key, index_value).await;
285 Ok(())
286 }
287
288 async fn upload(
290 &self,
291 index_key: IndexKey,
292 upload_path: &str,
293 remote_store: &ObjectStore,
294 ) -> Result<()> {
295 let region_id = index_key.region_id;
296 let file_id = index_key.file_id;
297 let file_type = index_key.file_type;
298 let cache_path = self.file_cache.cache_file_path(index_key);
299
300 let timer = FLUSH_ELAPSED
301 .with_label_values(&[match file_type {
302 FileType::Parquet => "upload_parquet",
303 FileType::Puffin => "upload_puffin",
304 }])
305 .start_timer();
306
307 let cached_value = self
308 .file_cache
309 .local_store()
310 .stat(&cache_path)
311 .await
312 .context(error::OpenDalSnafu)?;
313 let reader = self
314 .file_cache
315 .local_store()
316 .reader(&cache_path)
317 .await
318 .context(error::OpenDalSnafu)?
319 .into_futures_async_read(0..cached_value.content_length())
320 .await
321 .context(error::OpenDalSnafu)?;
322
323 let mut writer = remote_store
324 .writer_with(upload_path)
325 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
326 .concurrent(DEFAULT_WRITE_CONCURRENCY)
327 .await
328 .context(error::OpenDalSnafu)?
329 .into_futures_async_write();
330
331 let bytes_written =
332 futures::io::copy(reader, &mut writer)
333 .await
334 .context(error::UploadSnafu {
335 region_id,
336 file_id,
337 file_type,
338 })?;
339
340 writer.close().await.context(error::UploadSnafu {
342 region_id,
343 file_id,
344 file_type,
345 })?;
346
347 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
348
349 debug!(
350 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
351 region_id,
352 file_id,
353 upload_path,
354 timer.stop_and_record()
355 );
356
357 let index_value = IndexValue {
358 file_size: bytes_written as _,
359 };
360 self.file_cache.put(index_key, index_value).await;
362
363 Ok(())
364 }
365}
366
367pub struct SstUploadRequest {
369 pub dest_path_provider: RegionFilePathFactory,
371 pub remote_store: ObjectStore,
373}
374
375struct UploadTracker {
377 region_id: RegionId,
379 files_uploaded: Vec<String>,
381}
382
383impl UploadTracker {
384 fn new(region_id: RegionId) -> Self {
386 Self {
387 region_id,
388 files_uploaded: Vec::new(),
389 }
390 }
391
392 fn push_uploaded_file(&mut self, path: String) {
394 self.files_uploaded.push(path);
395 }
396
397 async fn clean(
399 &self,
400 sst_info: &SstInfoArray,
401 file_cache: &FileCacheRef,
402 remote_store: &ObjectStore,
403 ) {
404 common_telemetry::info!(
405 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
406 self.region_id,
407 sst_info.len()
408 );
409
410 for sst in sst_info {
412 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
413 file_cache.remove(parquet_key).await;
414
415 if sst.index_metadata.file_size > 0 {
416 let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
417 file_cache.remove(puffin_key).await;
418 }
419 }
420
421 for file_path in &self.files_uploaded {
423 if let Err(e) = remote_store.delete(file_path).await {
424 common_telemetry::error!(e; "Failed to delete file {}", file_path);
425 }
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use common_test_util::temp_dir::create_temp_dir;
433 use object_store::ATOMIC_WRITE_DIR;
434
435 use super::*;
436 use crate::access_layer::OperationType;
437 use crate::cache::test_util::new_fs_store;
438 use crate::cache::{CacheManager, CacheStrategy};
439 use crate::error::InvalidBatchSnafu;
440 use crate::read::Source;
441 use crate::region::options::IndexOptions;
442 use crate::sst::parquet::reader::ParquetReaderBuilder;
443 use crate::test_util::sst_util::{
444 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
445 sst_region_metadata,
446 };
447 use crate::test_util::TestEnv;
448
449 #[tokio::test]
450 async fn test_write_and_upload_sst() {
451 let mut env = TestEnv::new().await;
454 let mock_store = env.init_object_store_manager();
455 let path_provider = RegionFilePathFactory::new("test".to_string());
456
457 let local_dir = create_temp_dir("");
458 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
459
460 let write_cache = env
461 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
462 .await;
463
464 let metadata = Arc::new(sst_region_metadata());
466 let region_id = metadata.region_id;
467 let source = new_source(&[
468 new_batch_by_range(&["a", "d"], 0, 60),
469 new_batch_by_range(&["b", "f"], 0, 40),
470 new_batch_by_range(&["b", "h"], 100, 200),
471 ]);
472
473 let write_request = SstWriteRequest {
474 op_type: OperationType::Flush,
475 metadata,
476 source,
477 storage: None,
478 max_sequence: None,
479 cache_manager: Default::default(),
480 index_options: IndexOptions::default(),
481 inverted_index_config: Default::default(),
482 fulltext_index_config: Default::default(),
483 bloom_filter_index_config: Default::default(),
484 };
485
486 let upload_request = SstUploadRequest {
487 dest_path_provider: path_provider.clone(),
488 remote_store: mock_store.clone(),
489 };
490
491 let write_opts = WriteOptions {
492 row_group_size: 512,
493 ..Default::default()
494 };
495
496 let sst_info = write_cache
498 .write_and_upload_sst(write_request, upload_request, &write_opts)
499 .await
500 .unwrap()
501 .remove(0); let file_id = sst_info.file_id;
504 let sst_upload_path = path_provider.build_sst_file_path(file_id);
505 let index_upload_path = path_provider.build_index_file_path(file_id);
506
507 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
509 assert!(write_cache.file_cache.contains_key(&key));
510
511 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
513 let cache_data = local_store
514 .read(&write_cache.file_cache.cache_file_path(key))
515 .await
516 .unwrap();
517 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
518
519 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
521 assert!(write_cache.file_cache.contains_key(&index_key));
522
523 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
524 let cache_index_data = local_store
525 .read(&write_cache.file_cache.cache_file_path(index_key))
526 .await
527 .unwrap();
528 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
529
530 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
532 write_cache.remove(sst_index_key).await;
533 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
534 write_cache.remove(index_key).await;
535 assert!(!write_cache.file_cache.contains_key(&index_key));
536 }
537
538 #[tokio::test]
539 async fn test_read_metadata_from_write_cache() {
540 common_telemetry::init_default_ut_logging();
541 let mut env = TestEnv::new().await;
542 let data_home = env.data_home().display().to_string();
543 let mock_store = env.init_object_store_manager();
544
545 let local_dir = create_temp_dir("");
546 let local_path = local_dir.path().to_str().unwrap();
547 let local_store = new_fs_store(local_path);
548
549 let write_cache = env
551 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
552 .await;
553 let cache_manager = Arc::new(
554 CacheManager::builder()
555 .write_cache(Some(write_cache.clone()))
556 .build(),
557 );
558
559 let metadata = Arc::new(sst_region_metadata());
561
562 let source = new_source(&[
563 new_batch_by_range(&["a", "d"], 0, 60),
564 new_batch_by_range(&["b", "f"], 0, 40),
565 new_batch_by_range(&["b", "h"], 100, 200),
566 ]);
567
568 let write_request = SstWriteRequest {
570 op_type: OperationType::Flush,
571 metadata,
572 source,
573 storage: None,
574 max_sequence: None,
575 cache_manager: cache_manager.clone(),
576 index_options: IndexOptions::default(),
577 inverted_index_config: Default::default(),
578 fulltext_index_config: Default::default(),
579 bloom_filter_index_config: Default::default(),
580 };
581 let write_opts = WriteOptions {
582 row_group_size: 512,
583 ..Default::default()
584 };
585 let upload_request = SstUploadRequest {
586 dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
587 remote_store: mock_store.clone(),
588 };
589
590 let sst_info = write_cache
591 .write_and_upload_sst(write_request, upload_request, &write_opts)
592 .await
593 .unwrap()
594 .remove(0);
595 let write_parquet_metadata = sst_info.file_metadata.unwrap();
596
597 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
599 let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
600 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
601 let reader = builder.build().await.unwrap();
602
603 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
605 }
606
607 #[tokio::test]
608 async fn test_write_cache_clean_tmp_files() {
609 common_telemetry::init_default_ut_logging();
610 let mut env = TestEnv::new().await;
611 let data_home = env.data_home().display().to_string();
612 let mock_store = env.init_object_store_manager();
613
614 let write_cache_dir = create_temp_dir("");
615 let write_cache_path = write_cache_dir.path().to_str().unwrap();
616 let write_cache = env
617 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
618 .await;
619
620 let cache_manager = Arc::new(
622 CacheManager::builder()
623 .write_cache(Some(write_cache.clone()))
624 .build(),
625 );
626
627 let metadata = Arc::new(sst_region_metadata());
629
630 let source = Source::Iter(Box::new(
632 [
633 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
634 InvalidBatchSnafu {
635 reason: "Abort the writer",
636 }
637 .fail(),
638 ]
639 .into_iter(),
640 ));
641
642 let write_request = SstWriteRequest {
644 op_type: OperationType::Flush,
645 metadata,
646 source,
647 storage: None,
648 max_sequence: None,
649 cache_manager: cache_manager.clone(),
650 index_options: IndexOptions::default(),
651 inverted_index_config: Default::default(),
652 fulltext_index_config: Default::default(),
653 bloom_filter_index_config: Default::default(),
654 };
655 let write_opts = WriteOptions {
656 row_group_size: 512,
657 ..Default::default()
658 };
659 let upload_request = SstUploadRequest {
660 dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
661 remote_store: mock_store.clone(),
662 };
663
664 write_cache
665 .write_and_upload_sst(write_request, upload_request, &write_opts)
666 .await
667 .unwrap_err();
668 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
669 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
670 let mut has_files = false;
671 while let Some(entry) = entries.next_entry().await.unwrap() {
672 if entry.file_type().await.unwrap().is_dir() {
673 continue;
674 }
675 has_files = true;
676 common_telemetry::warn!(
677 "Found remaining temporary file in atomic dir: {}",
678 entry.path().display()
679 );
680 }
681
682 assert!(!has_files);
683 }
684}