1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34 UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
35};
36use crate::sst::file::RegionFileId;
37use crate::sst::index::IndexerBuilderImpl;
38use crate::sst::index::intermediate::IntermediateManager;
39use crate::sst::index::puffin_manager::PuffinManagerFactory;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::{SstInfo, WriteOptions};
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44pub struct WriteCache {
48 file_cache: FileCacheRef,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59 pub async fn new(
62 local_store: ObjectStore,
63 cache_capacity: ReadableSize,
64 ttl: Option<Duration>,
65 puffin_manager_factory: PuffinManagerFactory,
66 intermediate_manager: IntermediateManager,
67 ) -> Result<Self> {
68 let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69 file_cache.recover(false).await;
70
71 Ok(Self {
72 file_cache,
73 puffin_manager_factory,
74 intermediate_manager,
75 })
76 }
77
78 pub async fn new_fs(
80 cache_dir: &str,
81 cache_capacity: ReadableSize,
82 ttl: Option<Duration>,
83 puffin_manager_factory: PuffinManagerFactory,
84 intermediate_manager: IntermediateManager,
85 ) -> Result<Self> {
86 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88 let local_store = new_fs_cache_store(cache_dir).await?;
89 Self::new(
90 local_store,
91 cache_capacity,
92 ttl,
93 puffin_manager_factory,
94 intermediate_manager,
95 )
96 .await
97 }
98
99 pub(crate) fn file_cache(&self) -> FileCacheRef {
101 self.file_cache.clone()
102 }
103
104 pub(crate) async fn put_and_upload_sst(
106 &self,
107 data: &bytes::Bytes,
108 region_id: RegionId,
109 sst_info: &SstInfo,
110 upload_request: SstUploadRequest,
111 ) -> Result<Metrics> {
112 let file_id = sst_info.file_id;
113 let mut metrics = Metrics::new(WriteType::Flush);
114
115 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
117
118 let cache_start = Instant::now();
120 let cache_path = self.file_cache.cache_file_path(parquet_key);
121 let mut cache_writer = self
122 .file_cache
123 .local_store()
124 .writer(&cache_path)
125 .await
126 .context(crate::error::OpenDalSnafu)?;
127
128 cache_writer
129 .write(data.clone())
130 .await
131 .context(crate::error::OpenDalSnafu)?;
132 cache_writer
133 .close()
134 .await
135 .context(crate::error::OpenDalSnafu)?;
136
137 let index_value = IndexValue {
139 file_size: data.len() as u32,
140 };
141 self.file_cache.put(parquet_key, index_value).await;
142 metrics.write_batch = cache_start.elapsed();
143
144 let upload_start = Instant::now();
146 let region_file_id = RegionFileId::new(region_id, file_id);
147 let remote_path = upload_request
148 .dest_path_provider
149 .build_sst_file_path(region_file_id);
150
151 if let Err(e) = self
152 .upload(parquet_key, &remote_path, &upload_request.remote_store)
153 .await
154 {
155 self.remove(parquet_key).await;
157 return Err(e);
158 }
159
160 metrics.upload_parquet = upload_start.elapsed();
161 Ok(metrics)
162 }
163
164 pub(crate) async fn write_and_upload_sst(
166 &self,
167 write_request: SstWriteRequest,
168 upload_request: SstUploadRequest,
169 write_opts: &WriteOptions,
170 write_type: WriteType,
171 ) -> Result<(SstInfoArray, Metrics)> {
172 let region_id = write_request.metadata.region_id;
173
174 let store = self.file_cache.local_store();
175 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
176 let indexer = IndexerBuilderImpl {
177 op_type: write_request.op_type,
178 metadata: write_request.metadata.clone(),
179 row_group_size: write_opts.row_group_size,
180 puffin_manager: self
181 .puffin_manager_factory
182 .build(store.clone(), path_provider.clone()),
183 intermediate_manager: self.intermediate_manager.clone(),
184 index_options: write_request.index_options,
185 inverted_index_config: write_request.inverted_index_config,
186 fulltext_index_config: write_request.fulltext_index_config,
187 bloom_filter_index_config: write_request.bloom_filter_index_config,
188 };
189
190 let cleaner = TempFileCleaner::new(region_id, store.clone());
191 let mut writer = ParquetWriter::new_with_object_store(
193 store.clone(),
194 write_request.metadata,
195 indexer,
196 path_provider.clone(),
197 Metrics::new(write_type),
198 )
199 .await
200 .with_file_cleaner(cleaner);
201
202 let sst_info = match write_request.source {
203 either::Left(source) => {
204 writer
205 .write_all(source, write_request.max_sequence, write_opts)
206 .await?
207 }
208 either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
209 };
210 let mut metrics = writer.into_metrics();
211
212 if sst_info.is_empty() {
214 return Ok((sst_info, metrics));
215 }
216
217 let mut upload_tracker = UploadTracker::new(region_id);
218 let mut err = None;
219 let remote_store = &upload_request.remote_store;
220 for sst in &sst_info {
221 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
222 let parquet_path = upload_request
223 .dest_path_provider
224 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
225 let start = Instant::now();
226 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
227 err = Some(e);
228 break;
229 }
230 metrics.upload_parquet += start.elapsed();
231 upload_tracker.push_uploaded_file(parquet_path);
232
233 if sst.index_metadata.file_size > 0 {
234 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
235 let puffin_path = upload_request
236 .dest_path_provider
237 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
238 let start = Instant::now();
239 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
240 err = Some(e);
241 break;
242 }
243 metrics.upload_puffin += start.elapsed();
244 upload_tracker.push_uploaded_file(puffin_path);
245 }
246 }
247
248 if let Some(err) = err {
249 upload_tracker
251 .clean(&sst_info, &self.file_cache, remote_store)
252 .await;
253 return Err(err);
254 }
255
256 Ok((sst_info, metrics))
257 }
258
259 pub(crate) async fn remove(&self, index_key: IndexKey) {
261 self.file_cache.remove(index_key).await
262 }
263
264 pub(crate) async fn download(
267 &self,
268 index_key: IndexKey,
269 remote_path: &str,
270 remote_store: &ObjectStore,
271 file_size: u64,
272 ) -> Result<()> {
273 if let Err(e) = self
274 .download_without_cleaning(index_key, remote_path, remote_store, file_size)
275 .await
276 {
277 let filename = index_key.to_string();
278 TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
279 .await;
280
281 return Err(e);
282 }
283 Ok(())
284 }
285
286 async fn download_without_cleaning(
287 &self,
288 index_key: IndexKey,
289 remote_path: &str,
290 remote_store: &ObjectStore,
291 file_size: u64,
292 ) -> Result<()> {
293 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
294 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
295
296 let file_type = index_key.file_type;
297 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
298 .with_label_values(&[match file_type {
299 FileType::Parquet => "download_parquet",
300 FileType::Puffin => "download_puffin",
301 }])
302 .start_timer();
303
304 let reader = remote_store
305 .reader_with(remote_path)
306 .concurrent(DOWNLOAD_READER_CONCURRENCY)
307 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
308 .await
309 .context(error::OpenDalSnafu)?
310 .into_futures_async_read(0..file_size)
311 .await
312 .context(error::OpenDalSnafu)?;
313
314 let cache_path = self.file_cache.cache_file_path(index_key);
315 let mut writer = self
316 .file_cache
317 .local_store()
318 .writer(&cache_path)
319 .await
320 .context(error::OpenDalSnafu)?
321 .into_futures_async_write();
322
323 let region_id = index_key.region_id;
324 let file_id = index_key.file_id;
325 let bytes_written =
326 futures::io::copy(reader, &mut writer)
327 .await
328 .context(error::DownloadSnafu {
329 region_id,
330 file_id,
331 file_type,
332 })?;
333 writer.close().await.context(error::DownloadSnafu {
334 region_id,
335 file_id,
336 file_type,
337 })?;
338
339 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
340
341 let elapsed = timer.stop_and_record();
342 debug!(
343 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
344 remote_path, cache_path, bytes_written, region_id, elapsed,
345 );
346
347 let index_value = IndexValue {
348 file_size: bytes_written as _,
349 };
350 self.file_cache.put(index_key, index_value).await;
351 Ok(())
352 }
353
354 async fn upload(
356 &self,
357 index_key: IndexKey,
358 upload_path: &str,
359 remote_store: &ObjectStore,
360 ) -> Result<()> {
361 let region_id = index_key.region_id;
362 let file_id = index_key.file_id;
363 let file_type = index_key.file_type;
364 let cache_path = self.file_cache.cache_file_path(index_key);
365
366 let start = Instant::now();
367 let cached_value = self
368 .file_cache
369 .local_store()
370 .stat(&cache_path)
371 .await
372 .context(error::OpenDalSnafu)?;
373 let reader = self
374 .file_cache
375 .local_store()
376 .reader(&cache_path)
377 .await
378 .context(error::OpenDalSnafu)?
379 .into_futures_async_read(0..cached_value.content_length())
380 .await
381 .context(error::OpenDalSnafu)?;
382
383 let mut writer = remote_store
384 .writer_with(upload_path)
385 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
386 .concurrent(DEFAULT_WRITE_CONCURRENCY)
387 .await
388 .context(error::OpenDalSnafu)?
389 .into_futures_async_write();
390
391 let bytes_written =
392 futures::io::copy(reader, &mut writer)
393 .await
394 .context(error::UploadSnafu {
395 region_id,
396 file_id,
397 file_type,
398 })?;
399
400 writer.close().await.context(error::UploadSnafu {
402 region_id,
403 file_id,
404 file_type,
405 })?;
406
407 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
408
409 debug!(
410 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
411 region_id,
412 file_id,
413 upload_path,
414 start.elapsed(),
415 );
416
417 let index_value = IndexValue {
418 file_size: bytes_written as _,
419 };
420 self.file_cache.put(index_key, index_value).await;
422
423 Ok(())
424 }
425}
426
427pub struct SstUploadRequest {
429 pub dest_path_provider: RegionFilePathFactory,
431 pub remote_store: ObjectStore,
433}
434
435struct UploadTracker {
437 region_id: RegionId,
439 files_uploaded: Vec<String>,
441}
442
443impl UploadTracker {
444 fn new(region_id: RegionId) -> Self {
446 Self {
447 region_id,
448 files_uploaded: Vec::new(),
449 }
450 }
451
452 fn push_uploaded_file(&mut self, path: String) {
454 self.files_uploaded.push(path);
455 }
456
457 async fn clean(
459 &self,
460 sst_info: &SstInfoArray,
461 file_cache: &FileCacheRef,
462 remote_store: &ObjectStore,
463 ) {
464 common_telemetry::info!(
465 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
466 self.region_id,
467 sst_info.len()
468 );
469
470 for sst in sst_info {
472 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
473 file_cache.remove(parquet_key).await;
474
475 if sst.index_metadata.file_size > 0 {
476 let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
477 file_cache.remove(puffin_key).await;
478 }
479 }
480
481 for file_path in &self.files_uploaded {
483 if let Err(e) = remote_store.delete(file_path).await {
484 common_telemetry::error!(e; "Failed to delete file {}", file_path);
485 }
486 }
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use common_test_util::temp_dir::create_temp_dir;
493 use object_store::ATOMIC_WRITE_DIR;
494 use store_api::region_request::PathType;
495
496 use super::*;
497 use crate::access_layer::OperationType;
498 use crate::cache::test_util::new_fs_store;
499 use crate::cache::{CacheManager, CacheStrategy};
500 use crate::error::InvalidBatchSnafu;
501 use crate::read::Source;
502 use crate::region::options::IndexOptions;
503 use crate::sst::parquet::reader::ParquetReaderBuilder;
504 use crate::test_util::TestEnv;
505 use crate::test_util::sst_util::{
506 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
507 sst_region_metadata,
508 };
509
510 #[tokio::test]
511 async fn test_write_and_upload_sst() {
512 let mut env = TestEnv::new().await;
515 let mock_store = env.init_object_store_manager();
516 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
517
518 let local_dir = create_temp_dir("");
519 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
520
521 let write_cache = env
522 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
523 .await;
524
525 let metadata = Arc::new(sst_region_metadata());
527 let region_id = metadata.region_id;
528 let source = new_source(&[
529 new_batch_by_range(&["a", "d"], 0, 60),
530 new_batch_by_range(&["b", "f"], 0, 40),
531 new_batch_by_range(&["b", "h"], 100, 200),
532 ]);
533
534 let write_request = SstWriteRequest {
535 op_type: OperationType::Flush,
536 metadata,
537 source: either::Left(source),
538 storage: None,
539 max_sequence: None,
540 cache_manager: Default::default(),
541 index_options: IndexOptions::default(),
542 inverted_index_config: Default::default(),
543 fulltext_index_config: Default::default(),
544 bloom_filter_index_config: Default::default(),
545 };
546
547 let upload_request = SstUploadRequest {
548 dest_path_provider: path_provider.clone(),
549 remote_store: mock_store.clone(),
550 };
551
552 let write_opts = WriteOptions {
553 row_group_size: 512,
554 ..Default::default()
555 };
556
557 let (mut sst_infos, _) = write_cache
559 .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
560 .await
561 .unwrap();
562 let sst_info = sst_infos.remove(0);
563
564 let file_id = sst_info.file_id;
565 let sst_upload_path =
566 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
567 let index_upload_path =
568 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
569
570 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
572 assert!(write_cache.file_cache.contains_key(&key));
573
574 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
576 let cache_data = local_store
577 .read(&write_cache.file_cache.cache_file_path(key))
578 .await
579 .unwrap();
580 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
581
582 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
584 assert!(write_cache.file_cache.contains_key(&index_key));
585
586 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
587 let cache_index_data = local_store
588 .read(&write_cache.file_cache.cache_file_path(index_key))
589 .await
590 .unwrap();
591 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
592
593 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
595 write_cache.remove(sst_index_key).await;
596 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
597 write_cache.remove(index_key).await;
598 assert!(!write_cache.file_cache.contains_key(&index_key));
599 }
600
601 #[tokio::test]
602 async fn test_read_metadata_from_write_cache() {
603 common_telemetry::init_default_ut_logging();
604 let mut env = TestEnv::new().await;
605 let data_home = env.data_home().display().to_string();
606 let mock_store = env.init_object_store_manager();
607
608 let local_dir = create_temp_dir("");
609 let local_path = local_dir.path().to_str().unwrap();
610 let local_store = new_fs_store(local_path);
611
612 let write_cache = env
614 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
615 .await;
616 let cache_manager = Arc::new(
617 CacheManager::builder()
618 .write_cache(Some(write_cache.clone()))
619 .build(),
620 );
621
622 let metadata = Arc::new(sst_region_metadata());
624
625 let source = new_source(&[
626 new_batch_by_range(&["a", "d"], 0, 60),
627 new_batch_by_range(&["b", "f"], 0, 40),
628 new_batch_by_range(&["b", "h"], 100, 200),
629 ]);
630
631 let write_request = SstWriteRequest {
633 op_type: OperationType::Flush,
634 metadata,
635 source: either::Left(source),
636 storage: None,
637 max_sequence: None,
638 cache_manager: cache_manager.clone(),
639 index_options: IndexOptions::default(),
640 inverted_index_config: Default::default(),
641 fulltext_index_config: Default::default(),
642 bloom_filter_index_config: Default::default(),
643 };
644 let write_opts = WriteOptions {
645 row_group_size: 512,
646 ..Default::default()
647 };
648 let upload_request = SstUploadRequest {
649 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
650 remote_store: mock_store.clone(),
651 };
652
653 let (mut sst_infos, _) = write_cache
654 .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
655 .await
656 .unwrap();
657 let sst_info = sst_infos.remove(0);
658 let write_parquet_metadata = sst_info.file_metadata.unwrap();
659
660 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
662 let builder = ParquetReaderBuilder::new(
663 data_home,
664 PathType::Bare,
665 handle.clone(),
666 mock_store.clone(),
667 )
668 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
669 let reader = builder.build().await.unwrap();
670
671 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
673 }
674
675 #[tokio::test]
676 async fn test_write_cache_clean_tmp_files() {
677 common_telemetry::init_default_ut_logging();
678 let mut env = TestEnv::new().await;
679 let data_home = env.data_home().display().to_string();
680 let mock_store = env.init_object_store_manager();
681
682 let write_cache_dir = create_temp_dir("");
683 let write_cache_path = write_cache_dir.path().to_str().unwrap();
684 let write_cache = env
685 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
686 .await;
687
688 let cache_manager = Arc::new(
690 CacheManager::builder()
691 .write_cache(Some(write_cache.clone()))
692 .build(),
693 );
694
695 let metadata = Arc::new(sst_region_metadata());
697
698 let source = Source::Iter(Box::new(
700 [
701 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
702 InvalidBatchSnafu {
703 reason: "Abort the writer",
704 }
705 .fail(),
706 ]
707 .into_iter(),
708 ));
709
710 let write_request = SstWriteRequest {
712 op_type: OperationType::Flush,
713 metadata,
714 source: either::Left(source),
715 storage: None,
716 max_sequence: None,
717 cache_manager: cache_manager.clone(),
718 index_options: IndexOptions::default(),
719 inverted_index_config: Default::default(),
720 fulltext_index_config: Default::default(),
721 bloom_filter_index_config: Default::default(),
722 };
723 let write_opts = WriteOptions {
724 row_group_size: 512,
725 ..Default::default()
726 };
727 let upload_request = SstUploadRequest {
728 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
729 remote_store: mock_store.clone(),
730 };
731
732 write_cache
733 .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
734 .await
735 .unwrap_err();
736 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
737 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
738 let mut has_files = false;
739 while let Some(entry) = entries.next_entry().await.unwrap() {
740 if entry.file_type().await.unwrap().is_dir() {
741 continue;
742 }
743 has_files = true;
744 common_telemetry::warn!(
745 "Found remaining temporary file in atomic dir: {}",
746 entry.path().display()
747 );
748 }
749
750 assert!(!has_files);
751 }
752}