1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34 UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
35};
36use crate::sst::file::RegionFileId;
37use crate::sst::index::IndexerBuilderImpl;
38use crate::sst::index::intermediate::IntermediateManager;
39use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::{SstInfo, WriteOptions};
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44pub struct WriteCache {
48 file_cache: FileCacheRef,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59 pub async fn new(
62 local_store: ObjectStore,
63 cache_capacity: ReadableSize,
64 ttl: Option<Duration>,
65 puffin_manager_factory: PuffinManagerFactory,
66 intermediate_manager: IntermediateManager,
67 ) -> Result<Self> {
68 let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69 file_cache.recover(false).await;
70
71 Ok(Self {
72 file_cache,
73 puffin_manager_factory,
74 intermediate_manager,
75 })
76 }
77
78 pub async fn new_fs(
80 cache_dir: &str,
81 cache_capacity: ReadableSize,
82 ttl: Option<Duration>,
83 puffin_manager_factory: PuffinManagerFactory,
84 intermediate_manager: IntermediateManager,
85 ) -> Result<Self> {
86 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88 let local_store = new_fs_cache_store(cache_dir).await?;
89 Self::new(
90 local_store,
91 cache_capacity,
92 ttl,
93 puffin_manager_factory,
94 intermediate_manager,
95 )
96 .await
97 }
98
99 pub(crate) fn file_cache(&self) -> FileCacheRef {
101 self.file_cache.clone()
102 }
103
104 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
106 let store = self.file_cache.local_store();
107 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
108 self.puffin_manager_factory.build(store, path_provider)
109 }
110
111 pub(crate) async fn put_and_upload_sst(
113 &self,
114 data: &bytes::Bytes,
115 region_id: RegionId,
116 sst_info: &SstInfo,
117 upload_request: SstUploadRequest,
118 ) -> Result<Metrics> {
119 let file_id = sst_info.file_id;
120 let mut metrics = Metrics::new(WriteType::Flush);
121
122 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
124
125 let cache_start = Instant::now();
127 let cache_path = self.file_cache.cache_file_path(parquet_key);
128 let store = self.file_cache.local_store();
129 let cleaner = TempFileCleaner::new(region_id, store.clone());
130 let write_res = store
131 .write(&cache_path, data.clone())
132 .await
133 .context(crate::error::OpenDalSnafu);
134 if let Err(e) = write_res {
135 cleaner.clean_by_file_id(file_id).await;
136 return Err(e);
137 }
138
139 metrics.write_batch = cache_start.elapsed();
140
141 let upload_start = Instant::now();
143 let region_file_id = RegionFileId::new(region_id, file_id);
144 let remote_path = upload_request
145 .dest_path_provider
146 .build_sst_file_path(region_file_id);
147
148 if let Err(e) = self
149 .upload(parquet_key, &remote_path, &upload_request.remote_store)
150 .await
151 {
152 self.remove(parquet_key).await;
154 return Err(e);
155 }
156
157 metrics.upload_parquet = upload_start.elapsed();
158 Ok(metrics)
159 }
160
161 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
163 &self.intermediate_manager
164 }
165
166 pub(crate) async fn write_and_upload_sst(
168 &self,
169 write_request: SstWriteRequest,
170 upload_request: SstUploadRequest,
171 write_opts: &WriteOptions,
172 metrics: &mut Metrics,
173 ) -> Result<SstInfoArray> {
174 let region_id = write_request.metadata.region_id;
175
176 let store = self.file_cache.local_store();
177 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
178 let indexer = IndexerBuilderImpl {
179 build_type: write_request.op_type.into(),
180 metadata: write_request.metadata.clone(),
181 row_group_size: write_opts.row_group_size,
182 puffin_manager: self
183 .puffin_manager_factory
184 .build(store.clone(), path_provider.clone()),
185 intermediate_manager: self.intermediate_manager.clone(),
186 index_options: write_request.index_options,
187 inverted_index_config: write_request.inverted_index_config,
188 fulltext_index_config: write_request.fulltext_index_config,
189 bloom_filter_index_config: write_request.bloom_filter_index_config,
190 };
191
192 let cleaner = TempFileCleaner::new(region_id, store.clone());
193 let mut writer = ParquetWriter::new_with_object_store(
195 store.clone(),
196 write_request.metadata,
197 write_request.index_config,
198 indexer,
199 path_provider.clone(),
200 metrics,
201 )
202 .await
203 .with_file_cleaner(cleaner);
204
205 let sst_info = match write_request.source {
206 either::Left(source) => {
207 writer
208 .write_all(source, write_request.max_sequence, write_opts)
209 .await?
210 }
211 either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
212 };
213
214 if sst_info.is_empty() {
216 return Ok(sst_info);
217 }
218
219 let mut upload_tracker = UploadTracker::new(region_id);
220 let mut err = None;
221 let remote_store = &upload_request.remote_store;
222 for sst in &sst_info {
223 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
224 let parquet_path = upload_request
225 .dest_path_provider
226 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
227 let start = Instant::now();
228 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
229 err = Some(e);
230 break;
231 }
232 metrics.upload_parquet += start.elapsed();
233 upload_tracker.push_uploaded_file(parquet_path);
234
235 if sst.index_metadata.file_size > 0 {
236 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
237 let puffin_path = upload_request
238 .dest_path_provider
239 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
240 let start = Instant::now();
241 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
242 err = Some(e);
243 break;
244 }
245 metrics.upload_puffin += start.elapsed();
246 upload_tracker.push_uploaded_file(puffin_path);
247 }
248 }
249
250 if let Some(err) = err {
251 upload_tracker
253 .clean(&sst_info, &self.file_cache, remote_store)
254 .await;
255 return Err(err);
256 }
257
258 Ok(sst_info)
259 }
260
261 pub(crate) async fn remove(&self, index_key: IndexKey) {
263 self.file_cache.remove(index_key).await
264 }
265
266 pub(crate) async fn download(
269 &self,
270 index_key: IndexKey,
271 remote_path: &str,
272 remote_store: &ObjectStore,
273 file_size: u64,
274 ) -> Result<()> {
275 if let Err(e) = self
276 .download_without_cleaning(index_key, remote_path, remote_store, file_size)
277 .await
278 {
279 let filename = index_key.to_string();
280 TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
281 .await;
282
283 return Err(e);
284 }
285 Ok(())
286 }
287
288 async fn download_without_cleaning(
289 &self,
290 index_key: IndexKey,
291 remote_path: &str,
292 remote_store: &ObjectStore,
293 file_size: u64,
294 ) -> Result<()> {
295 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
296 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
297
298 let file_type = index_key.file_type;
299 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
300 .with_label_values(&[match file_type {
301 FileType::Parquet => "download_parquet",
302 FileType::Puffin => "download_puffin",
303 }])
304 .start_timer();
305
306 let reader = remote_store
307 .reader_with(remote_path)
308 .concurrent(DOWNLOAD_READER_CONCURRENCY)
309 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
310 .await
311 .context(error::OpenDalSnafu)?
312 .into_futures_async_read(0..file_size)
313 .await
314 .context(error::OpenDalSnafu)?;
315
316 let cache_path = self.file_cache.cache_file_path(index_key);
317 let mut writer = self
318 .file_cache
319 .local_store()
320 .writer(&cache_path)
321 .await
322 .context(error::OpenDalSnafu)?
323 .into_futures_async_write();
324
325 let region_id = index_key.region_id;
326 let file_id = index_key.file_id;
327 let bytes_written =
328 futures::io::copy(reader, &mut writer)
329 .await
330 .context(error::DownloadSnafu {
331 region_id,
332 file_id,
333 file_type,
334 })?;
335 writer.close().await.context(error::DownloadSnafu {
336 region_id,
337 file_id,
338 file_type,
339 })?;
340
341 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
342
343 let elapsed = timer.stop_and_record();
344 debug!(
345 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
346 remote_path, cache_path, bytes_written, region_id, elapsed,
347 );
348
349 let index_value = IndexValue {
350 file_size: bytes_written as _,
351 };
352 self.file_cache.put(index_key, index_value).await;
353 Ok(())
354 }
355
356 pub(crate) async fn upload(
358 &self,
359 index_key: IndexKey,
360 upload_path: &str,
361 remote_store: &ObjectStore,
362 ) -> Result<()> {
363 let region_id = index_key.region_id;
364 let file_id = index_key.file_id;
365 let file_type = index_key.file_type;
366 let cache_path = self.file_cache.cache_file_path(index_key);
367
368 let start = Instant::now();
369 let cached_value = self
370 .file_cache
371 .local_store()
372 .stat(&cache_path)
373 .await
374 .context(error::OpenDalSnafu)?;
375 let reader = self
376 .file_cache
377 .local_store()
378 .reader(&cache_path)
379 .await
380 .context(error::OpenDalSnafu)?
381 .into_futures_async_read(0..cached_value.content_length())
382 .await
383 .context(error::OpenDalSnafu)?;
384
385 let mut writer = remote_store
386 .writer_with(upload_path)
387 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
388 .concurrent(DEFAULT_WRITE_CONCURRENCY)
389 .await
390 .context(error::OpenDalSnafu)?
391 .into_futures_async_write();
392
393 let bytes_written =
394 futures::io::copy(reader, &mut writer)
395 .await
396 .context(error::UploadSnafu {
397 region_id,
398 file_id,
399 file_type,
400 })?;
401
402 writer.close().await.context(error::UploadSnafu {
404 region_id,
405 file_id,
406 file_type,
407 })?;
408
409 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
410
411 debug!(
412 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
413 region_id,
414 file_id,
415 upload_path,
416 start.elapsed(),
417 );
418
419 let index_value = IndexValue {
420 file_size: bytes_written as _,
421 };
422 self.file_cache.put(index_key, index_value).await;
424
425 Ok(())
426 }
427}
428
429pub struct SstUploadRequest {
431 pub dest_path_provider: RegionFilePathFactory,
433 pub remote_store: ObjectStore,
435}
436
437pub(crate) struct UploadTracker {
439 region_id: RegionId,
441 files_uploaded: Vec<String>,
443}
444
445impl UploadTracker {
446 pub(crate) fn new(region_id: RegionId) -> Self {
448 Self {
449 region_id,
450 files_uploaded: Vec::new(),
451 }
452 }
453
454 pub(crate) fn push_uploaded_file(&mut self, path: String) {
456 self.files_uploaded.push(path);
457 }
458
459 pub(crate) async fn clean(
461 &self,
462 sst_info: &SstInfoArray,
463 file_cache: &FileCacheRef,
464 remote_store: &ObjectStore,
465 ) {
466 common_telemetry::info!(
467 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
468 self.region_id,
469 sst_info.len()
470 );
471
472 for sst in sst_info {
474 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
475 file_cache.remove(parquet_key).await;
476
477 if sst.index_metadata.file_size > 0 {
478 let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
479 file_cache.remove(puffin_key).await;
480 }
481 }
482
483 for file_path in &self.files_uploaded {
485 if let Err(e) = remote_store.delete(file_path).await {
486 common_telemetry::error!(e; "Failed to delete file {}", file_path);
487 }
488 }
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use common_test_util::temp_dir::create_temp_dir;
495 use object_store::ATOMIC_WRITE_DIR;
496 use store_api::region_request::PathType;
497
498 use super::*;
499 use crate::access_layer::OperationType;
500 use crate::cache::test_util::new_fs_store;
501 use crate::cache::{CacheManager, CacheStrategy};
502 use crate::error::InvalidBatchSnafu;
503 use crate::read::Source;
504 use crate::region::options::IndexOptions;
505 use crate::sst::parquet::reader::ParquetReaderBuilder;
506 use crate::test_util::TestEnv;
507 use crate::test_util::sst_util::{
508 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
509 sst_region_metadata,
510 };
511
512 #[tokio::test]
513 async fn test_write_and_upload_sst() {
514 let mut env = TestEnv::new().await;
517 let mock_store = env.init_object_store_manager();
518 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
519
520 let local_dir = create_temp_dir("");
521 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
522
523 let write_cache = env
524 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
525 .await;
526
527 let metadata = Arc::new(sst_region_metadata());
529 let region_id = metadata.region_id;
530 let source = new_source(&[
531 new_batch_by_range(&["a", "d"], 0, 60),
532 new_batch_by_range(&["b", "f"], 0, 40),
533 new_batch_by_range(&["b", "h"], 100, 200),
534 ]);
535
536 let write_request = SstWriteRequest {
537 op_type: OperationType::Flush,
538 metadata,
539 source: either::Left(source),
540 storage: None,
541 max_sequence: None,
542 cache_manager: Default::default(),
543 index_options: IndexOptions::default(),
544 index_config: Default::default(),
545 inverted_index_config: Default::default(),
546 fulltext_index_config: Default::default(),
547 bloom_filter_index_config: Default::default(),
548 };
549
550 let upload_request = SstUploadRequest {
551 dest_path_provider: path_provider.clone(),
552 remote_store: mock_store.clone(),
553 };
554
555 let write_opts = WriteOptions {
556 row_group_size: 512,
557 ..Default::default()
558 };
559
560 let mut metrics = Metrics::new(WriteType::Flush);
562 let mut sst_infos = write_cache
563 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
564 .await
565 .unwrap();
566 let sst_info = sst_infos.remove(0);
567
568 let file_id = sst_info.file_id;
569 let sst_upload_path =
570 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
571 let index_upload_path =
572 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
573
574 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
576 assert!(write_cache.file_cache.contains_key(&key));
577
578 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
580 let cache_data = local_store
581 .read(&write_cache.file_cache.cache_file_path(key))
582 .await
583 .unwrap();
584 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
585
586 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
588 assert!(write_cache.file_cache.contains_key(&index_key));
589
590 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
591 let cache_index_data = local_store
592 .read(&write_cache.file_cache.cache_file_path(index_key))
593 .await
594 .unwrap();
595 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
596
597 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
599 write_cache.remove(sst_index_key).await;
600 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
601 write_cache.remove(index_key).await;
602 assert!(!write_cache.file_cache.contains_key(&index_key));
603 }
604
605 #[tokio::test]
606 async fn test_read_metadata_from_write_cache() {
607 common_telemetry::init_default_ut_logging();
608 let mut env = TestEnv::new().await;
609 let data_home = env.data_home().display().to_string();
610 let mock_store = env.init_object_store_manager();
611
612 let local_dir = create_temp_dir("");
613 let local_path = local_dir.path().to_str().unwrap();
614 let local_store = new_fs_store(local_path);
615
616 let write_cache = env
618 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
619 .await;
620 let cache_manager = Arc::new(
621 CacheManager::builder()
622 .write_cache(Some(write_cache.clone()))
623 .build(),
624 );
625
626 let metadata = Arc::new(sst_region_metadata());
628
629 let source = new_source(&[
630 new_batch_by_range(&["a", "d"], 0, 60),
631 new_batch_by_range(&["b", "f"], 0, 40),
632 new_batch_by_range(&["b", "h"], 100, 200),
633 ]);
634
635 let write_request = SstWriteRequest {
637 op_type: OperationType::Flush,
638 metadata,
639 source: either::Left(source),
640 storage: None,
641 max_sequence: None,
642 cache_manager: cache_manager.clone(),
643 index_options: IndexOptions::default(),
644 index_config: Default::default(),
645 inverted_index_config: Default::default(),
646 fulltext_index_config: Default::default(),
647 bloom_filter_index_config: Default::default(),
648 };
649 let write_opts = WriteOptions {
650 row_group_size: 512,
651 ..Default::default()
652 };
653 let upload_request = SstUploadRequest {
654 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
655 remote_store: mock_store.clone(),
656 };
657
658 let mut metrics = Metrics::new(WriteType::Flush);
659 let mut sst_infos = write_cache
660 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
661 .await
662 .unwrap();
663 let sst_info = sst_infos.remove(0);
664 let write_parquet_metadata = sst_info.file_metadata.unwrap();
665
666 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
668 let builder = ParquetReaderBuilder::new(
669 data_home,
670 PathType::Bare,
671 handle.clone(),
672 mock_store.clone(),
673 )
674 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
675 let reader = builder.build().await.unwrap();
676
677 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
679 }
680
681 #[tokio::test]
682 async fn test_write_cache_clean_tmp_files() {
683 common_telemetry::init_default_ut_logging();
684 let mut env = TestEnv::new().await;
685 let data_home = env.data_home().display().to_string();
686 let mock_store = env.init_object_store_manager();
687
688 let write_cache_dir = create_temp_dir("");
689 let write_cache_path = write_cache_dir.path().to_str().unwrap();
690 let write_cache = env
691 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
692 .await;
693
694 let cache_manager = Arc::new(
696 CacheManager::builder()
697 .write_cache(Some(write_cache.clone()))
698 .build(),
699 );
700
701 let metadata = Arc::new(sst_region_metadata());
703
704 let source = Source::Iter(Box::new(
706 [
707 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
708 InvalidBatchSnafu {
709 reason: "Abort the writer",
710 }
711 .fail(),
712 ]
713 .into_iter(),
714 ));
715
716 let write_request = SstWriteRequest {
718 op_type: OperationType::Flush,
719 metadata,
720 source: either::Left(source),
721 storage: None,
722 max_sequence: None,
723 cache_manager: cache_manager.clone(),
724 index_options: IndexOptions::default(),
725 index_config: Default::default(),
726 inverted_index_config: Default::default(),
727 fulltext_index_config: Default::default(),
728 bloom_filter_index_config: Default::default(),
729 };
730 let write_opts = WriteOptions {
731 row_group_size: 512,
732 ..Default::default()
733 };
734 let upload_request = SstUploadRequest {
735 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
736 remote_store: mock_store.clone(),
737 };
738
739 let mut metrics = Metrics::new(WriteType::Flush);
740 write_cache
741 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
742 .await
743 .unwrap_err();
744 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
745 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
746 let mut has_files = false;
747 while let Some(entry) = entries.next_entry().await.unwrap() {
748 if entry.file_type().await.unwrap().is_dir() {
749 continue;
750 }
751 has_files = true;
752 common_telemetry::warn!(
753 "Found remaining temporary file in atomic dir: {}",
754 entry.path().display()
755 );
756 }
757
758 assert!(!has_files);
759 }
760}