1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28 new_fs_cache_store, FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray,
29 SstWriteRequest, TempFileCleaner, WriteCachePathProvider, WriteType,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34 UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
35};
36use crate::sst::file::RegionFileId;
37use crate::sst::index::intermediate::IntermediateManager;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39use crate::sst::index::IndexerBuilderImpl;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::WriteOptions;
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44pub struct WriteCache {
48 file_cache: FileCacheRef,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59 pub async fn new(
62 local_store: ObjectStore,
63 cache_capacity: ReadableSize,
64 ttl: Option<Duration>,
65 puffin_manager_factory: PuffinManagerFactory,
66 intermediate_manager: IntermediateManager,
67 ) -> Result<Self> {
68 let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69 file_cache.recover(false).await;
70
71 Ok(Self {
72 file_cache,
73 puffin_manager_factory,
74 intermediate_manager,
75 })
76 }
77
78 pub async fn new_fs(
80 cache_dir: &str,
81 cache_capacity: ReadableSize,
82 ttl: Option<Duration>,
83 puffin_manager_factory: PuffinManagerFactory,
84 intermediate_manager: IntermediateManager,
85 ) -> Result<Self> {
86 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88 let local_store = new_fs_cache_store(cache_dir).await?;
89 Self::new(
90 local_store,
91 cache_capacity,
92 ttl,
93 puffin_manager_factory,
94 intermediate_manager,
95 )
96 .await
97 }
98
99 pub(crate) fn file_cache(&self) -> FileCacheRef {
101 self.file_cache.clone()
102 }
103
104 pub(crate) async fn write_and_upload_sst(
106 &self,
107 write_request: SstWriteRequest,
108 upload_request: SstUploadRequest,
109 write_opts: &WriteOptions,
110 write_type: WriteType,
111 ) -> Result<(SstInfoArray, Metrics)> {
112 let region_id = write_request.metadata.region_id;
113
114 let store = self.file_cache.local_store();
115 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
116 let indexer = IndexerBuilderImpl {
117 op_type: write_request.op_type,
118 metadata: write_request.metadata.clone(),
119 row_group_size: write_opts.row_group_size,
120 puffin_manager: self
121 .puffin_manager_factory
122 .build(store.clone(), path_provider.clone()),
123 intermediate_manager: self.intermediate_manager.clone(),
124 index_options: write_request.index_options,
125 inverted_index_config: write_request.inverted_index_config,
126 fulltext_index_config: write_request.fulltext_index_config,
127 bloom_filter_index_config: write_request.bloom_filter_index_config,
128 };
129
130 let cleaner = TempFileCleaner::new(region_id, store.clone());
131 let mut writer = ParquetWriter::new_with_object_store(
133 store.clone(),
134 write_request.metadata,
135 indexer,
136 path_provider.clone(),
137 Metrics::new(write_type),
138 )
139 .await
140 .with_file_cleaner(cleaner);
141
142 let sst_info = writer
143 .write_all(write_request.source, write_request.max_sequence, write_opts)
144 .await?;
145 let mut metrics = writer.into_metrics();
146
147 if sst_info.is_empty() {
149 return Ok((sst_info, metrics));
150 }
151
152 let mut upload_tracker = UploadTracker::new(region_id);
153 let mut err = None;
154 let remote_store = &upload_request.remote_store;
155 for sst in &sst_info {
156 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
157 let parquet_path = upload_request
158 .dest_path_provider
159 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
160 let start = Instant::now();
161 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
162 err = Some(e);
163 break;
164 }
165 metrics.upload_parquet += start.elapsed();
166 upload_tracker.push_uploaded_file(parquet_path);
167
168 if sst.index_metadata.file_size > 0 {
169 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
170 let puffin_path = upload_request
171 .dest_path_provider
172 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
173 let start = Instant::now();
174 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
175 err = Some(e);
176 break;
177 }
178 metrics.upload_puffin += start.elapsed();
179 upload_tracker.push_uploaded_file(puffin_path);
180 }
181 }
182
183 if let Some(err) = err {
184 upload_tracker
186 .clean(&sst_info, &self.file_cache, remote_store)
187 .await;
188 return Err(err);
189 }
190
191 Ok((sst_info, metrics))
192 }
193
194 pub(crate) async fn remove(&self, index_key: IndexKey) {
196 self.file_cache.remove(index_key).await
197 }
198
199 pub(crate) async fn download(
202 &self,
203 index_key: IndexKey,
204 remote_path: &str,
205 remote_store: &ObjectStore,
206 file_size: u64,
207 ) -> Result<()> {
208 if let Err(e) = self
209 .download_without_cleaning(index_key, remote_path, remote_store, file_size)
210 .await
211 {
212 let filename = index_key.to_string();
213 TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
214 .await;
215
216 return Err(e);
217 }
218 Ok(())
219 }
220
221 async fn download_without_cleaning(
222 &self,
223 index_key: IndexKey,
224 remote_path: &str,
225 remote_store: &ObjectStore,
226 file_size: u64,
227 ) -> Result<()> {
228 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
229 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
230
231 let file_type = index_key.file_type;
232 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
233 .with_label_values(&[match file_type {
234 FileType::Parquet => "download_parquet",
235 FileType::Puffin => "download_puffin",
236 }])
237 .start_timer();
238
239 let reader = remote_store
240 .reader_with(remote_path)
241 .concurrent(DOWNLOAD_READER_CONCURRENCY)
242 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
243 .await
244 .context(error::OpenDalSnafu)?
245 .into_futures_async_read(0..file_size)
246 .await
247 .context(error::OpenDalSnafu)?;
248
249 let cache_path = self.file_cache.cache_file_path(index_key);
250 let mut writer = self
251 .file_cache
252 .local_store()
253 .writer(&cache_path)
254 .await
255 .context(error::OpenDalSnafu)?
256 .into_futures_async_write();
257
258 let region_id = index_key.region_id;
259 let file_id = index_key.file_id;
260 let bytes_written =
261 futures::io::copy(reader, &mut writer)
262 .await
263 .context(error::DownloadSnafu {
264 region_id,
265 file_id,
266 file_type,
267 })?;
268 writer.close().await.context(error::DownloadSnafu {
269 region_id,
270 file_id,
271 file_type,
272 })?;
273
274 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
275
276 let elapsed = timer.stop_and_record();
277 debug!(
278 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
279 remote_path, cache_path, bytes_written, region_id, elapsed,
280 );
281
282 let index_value = IndexValue {
283 file_size: bytes_written as _,
284 };
285 self.file_cache.put(index_key, index_value).await;
286 Ok(())
287 }
288
289 async fn upload(
291 &self,
292 index_key: IndexKey,
293 upload_path: &str,
294 remote_store: &ObjectStore,
295 ) -> Result<()> {
296 let region_id = index_key.region_id;
297 let file_id = index_key.file_id;
298 let file_type = index_key.file_type;
299 let cache_path = self.file_cache.cache_file_path(index_key);
300
301 let start = Instant::now();
302 let cached_value = self
303 .file_cache
304 .local_store()
305 .stat(&cache_path)
306 .await
307 .context(error::OpenDalSnafu)?;
308 let reader = self
309 .file_cache
310 .local_store()
311 .reader(&cache_path)
312 .await
313 .context(error::OpenDalSnafu)?
314 .into_futures_async_read(0..cached_value.content_length())
315 .await
316 .context(error::OpenDalSnafu)?;
317
318 let mut writer = remote_store
319 .writer_with(upload_path)
320 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
321 .concurrent(DEFAULT_WRITE_CONCURRENCY)
322 .await
323 .context(error::OpenDalSnafu)?
324 .into_futures_async_write();
325
326 let bytes_written =
327 futures::io::copy(reader, &mut writer)
328 .await
329 .context(error::UploadSnafu {
330 region_id,
331 file_id,
332 file_type,
333 })?;
334
335 writer.close().await.context(error::UploadSnafu {
337 region_id,
338 file_id,
339 file_type,
340 })?;
341
342 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
343
344 debug!(
345 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
346 region_id,
347 file_id,
348 upload_path,
349 start.elapsed(),
350 );
351
352 let index_value = IndexValue {
353 file_size: bytes_written as _,
354 };
355 self.file_cache.put(index_key, index_value).await;
357
358 Ok(())
359 }
360}
361
362pub struct SstUploadRequest {
364 pub dest_path_provider: RegionFilePathFactory,
366 pub remote_store: ObjectStore,
368}
369
370struct UploadTracker {
372 region_id: RegionId,
374 files_uploaded: Vec<String>,
376}
377
378impl UploadTracker {
379 fn new(region_id: RegionId) -> Self {
381 Self {
382 region_id,
383 files_uploaded: Vec::new(),
384 }
385 }
386
387 fn push_uploaded_file(&mut self, path: String) {
389 self.files_uploaded.push(path);
390 }
391
392 async fn clean(
394 &self,
395 sst_info: &SstInfoArray,
396 file_cache: &FileCacheRef,
397 remote_store: &ObjectStore,
398 ) {
399 common_telemetry::info!(
400 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
401 self.region_id,
402 sst_info.len()
403 );
404
405 for sst in sst_info {
407 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
408 file_cache.remove(parquet_key).await;
409
410 if sst.index_metadata.file_size > 0 {
411 let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
412 file_cache.remove(puffin_key).await;
413 }
414 }
415
416 for file_path in &self.files_uploaded {
418 if let Err(e) = remote_store.delete(file_path).await {
419 common_telemetry::error!(e; "Failed to delete file {}", file_path);
420 }
421 }
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use common_test_util::temp_dir::create_temp_dir;
428 use object_store::ATOMIC_WRITE_DIR;
429 use store_api::region_request::PathType;
430
431 use super::*;
432 use crate::access_layer::OperationType;
433 use crate::cache::test_util::new_fs_store;
434 use crate::cache::{CacheManager, CacheStrategy};
435 use crate::error::InvalidBatchSnafu;
436 use crate::read::Source;
437 use crate::region::options::IndexOptions;
438 use crate::sst::parquet::reader::ParquetReaderBuilder;
439 use crate::test_util::sst_util::{
440 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
441 sst_region_metadata,
442 };
443 use crate::test_util::TestEnv;
444
445 #[tokio::test]
446 async fn test_write_and_upload_sst() {
447 let mut env = TestEnv::new().await;
450 let mock_store = env.init_object_store_manager();
451 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
452
453 let local_dir = create_temp_dir("");
454 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
455
456 let write_cache = env
457 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
458 .await;
459
460 let metadata = Arc::new(sst_region_metadata());
462 let region_id = metadata.region_id;
463 let source = new_source(&[
464 new_batch_by_range(&["a", "d"], 0, 60),
465 new_batch_by_range(&["b", "f"], 0, 40),
466 new_batch_by_range(&["b", "h"], 100, 200),
467 ]);
468
469 let write_request = SstWriteRequest {
470 op_type: OperationType::Flush,
471 metadata,
472 source,
473 storage: None,
474 max_sequence: None,
475 cache_manager: Default::default(),
476 index_options: IndexOptions::default(),
477 inverted_index_config: Default::default(),
478 fulltext_index_config: Default::default(),
479 bloom_filter_index_config: Default::default(),
480 };
481
482 let upload_request = SstUploadRequest {
483 dest_path_provider: path_provider.clone(),
484 remote_store: mock_store.clone(),
485 };
486
487 let write_opts = WriteOptions {
488 row_group_size: 512,
489 ..Default::default()
490 };
491
492 let (mut sst_infos, _) = write_cache
494 .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
495 .await
496 .unwrap();
497 let sst_info = sst_infos.remove(0);
498
499 let file_id = sst_info.file_id;
500 let sst_upload_path =
501 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
502 let index_upload_path =
503 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
504
505 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
507 assert!(write_cache.file_cache.contains_key(&key));
508
509 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
511 let cache_data = local_store
512 .read(&write_cache.file_cache.cache_file_path(key))
513 .await
514 .unwrap();
515 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
516
517 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
519 assert!(write_cache.file_cache.contains_key(&index_key));
520
521 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
522 let cache_index_data = local_store
523 .read(&write_cache.file_cache.cache_file_path(index_key))
524 .await
525 .unwrap();
526 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
527
528 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
530 write_cache.remove(sst_index_key).await;
531 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
532 write_cache.remove(index_key).await;
533 assert!(!write_cache.file_cache.contains_key(&index_key));
534 }
535
536 #[tokio::test]
537 async fn test_read_metadata_from_write_cache() {
538 common_telemetry::init_default_ut_logging();
539 let mut env = TestEnv::new().await;
540 let data_home = env.data_home().display().to_string();
541 let mock_store = env.init_object_store_manager();
542
543 let local_dir = create_temp_dir("");
544 let local_path = local_dir.path().to_str().unwrap();
545 let local_store = new_fs_store(local_path);
546
547 let write_cache = env
549 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
550 .await;
551 let cache_manager = Arc::new(
552 CacheManager::builder()
553 .write_cache(Some(write_cache.clone()))
554 .build(),
555 );
556
557 let metadata = Arc::new(sst_region_metadata());
559
560 let source = new_source(&[
561 new_batch_by_range(&["a", "d"], 0, 60),
562 new_batch_by_range(&["b", "f"], 0, 40),
563 new_batch_by_range(&["b", "h"], 100, 200),
564 ]);
565
566 let write_request = SstWriteRequest {
568 op_type: OperationType::Flush,
569 metadata,
570 source,
571 storage: None,
572 max_sequence: None,
573 cache_manager: cache_manager.clone(),
574 index_options: IndexOptions::default(),
575 inverted_index_config: Default::default(),
576 fulltext_index_config: Default::default(),
577 bloom_filter_index_config: Default::default(),
578 };
579 let write_opts = WriteOptions {
580 row_group_size: 512,
581 ..Default::default()
582 };
583 let upload_request = SstUploadRequest {
584 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
585 remote_store: mock_store.clone(),
586 };
587
588 let (mut sst_infos, _) = write_cache
589 .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
590 .await
591 .unwrap();
592 let sst_info = sst_infos.remove(0);
593 let write_parquet_metadata = sst_info.file_metadata.unwrap();
594
595 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
597 let builder = ParquetReaderBuilder::new(
598 data_home,
599 PathType::Bare,
600 handle.clone(),
601 mock_store.clone(),
602 )
603 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
604 let reader = builder.build().await.unwrap();
605
606 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
608 }
609
610 #[tokio::test]
611 async fn test_write_cache_clean_tmp_files() {
612 common_telemetry::init_default_ut_logging();
613 let mut env = TestEnv::new().await;
614 let data_home = env.data_home().display().to_string();
615 let mock_store = env.init_object_store_manager();
616
617 let write_cache_dir = create_temp_dir("");
618 let write_cache_path = write_cache_dir.path().to_str().unwrap();
619 let write_cache = env
620 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
621 .await;
622
623 let cache_manager = Arc::new(
625 CacheManager::builder()
626 .write_cache(Some(write_cache.clone()))
627 .build(),
628 );
629
630 let metadata = Arc::new(sst_region_metadata());
632
633 let source = Source::Iter(Box::new(
635 [
636 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
637 InvalidBatchSnafu {
638 reason: "Abort the writer",
639 }
640 .fail(),
641 ]
642 .into_iter(),
643 ));
644
645 let write_request = SstWriteRequest {
647 op_type: OperationType::Flush,
648 metadata,
649 source,
650 storage: None,
651 max_sequence: None,
652 cache_manager: cache_manager.clone(),
653 index_options: IndexOptions::default(),
654 inverted_index_config: Default::default(),
655 fulltext_index_config: Default::default(),
656 bloom_filter_index_config: Default::default(),
657 };
658 let write_opts = WriteOptions {
659 row_group_size: 512,
660 ..Default::default()
661 };
662 let upload_request = SstUploadRequest {
663 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
664 remote_store: mock_store.clone(),
665 };
666
667 write_cache
668 .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush)
669 .await
670 .unwrap_err();
671 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
672 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
673 let mut has_files = false;
674 while let Some(entry) = entries.next_entry().await.unwrap() {
675 if entry.file_type().await.unwrap().is_dir() {
676 continue;
677 }
678 has_files = true;
679 common_telemetry::warn!(
680 "Found remaining temporary file in atomic dir: {}",
681 entry.path().display()
682 );
683 }
684
685 assert!(!has_files);
686 }
687}