1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::error::{self, Result};
34use crate::metrics::UPLOAD_BYTES_TOTAL;
35use crate::region::opener::RegionLoadCacheTask;
36use crate::sst::file::RegionFileId;
37use crate::sst::index::IndexerBuilderImpl;
38use crate::sst::index::intermediate::IntermediateManager;
39use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::{SstInfo, WriteOptions};
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44pub struct WriteCache {
48 file_cache: FileCacheRef,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54 task_sender: UnboundedSender<RegionLoadCacheTask>,
56}
57
58pub type WriteCacheRef = Arc<WriteCache>;
59
60impl WriteCache {
61 pub async fn new(
64 local_store: ObjectStore,
65 cache_capacity: ReadableSize,
66 ttl: Option<Duration>,
67 index_cache_percent: Option<u8>,
68 puffin_manager_factory: PuffinManagerFactory,
69 intermediate_manager: IntermediateManager,
70 ) -> Result<Self> {
71 let (task_sender, task_receiver) = unbounded_channel();
72
73 let file_cache = Arc::new(FileCache::new(
74 local_store,
75 cache_capacity,
76 ttl,
77 index_cache_percent,
78 ));
79 file_cache.recover(false, Some(task_receiver)).await;
80
81 Ok(Self {
82 file_cache,
83 puffin_manager_factory,
84 intermediate_manager,
85 task_sender,
86 })
87 }
88
89 pub async fn new_fs(
91 cache_dir: &str,
92 cache_capacity: ReadableSize,
93 ttl: Option<Duration>,
94 index_cache_percent: Option<u8>,
95 puffin_manager_factory: PuffinManagerFactory,
96 intermediate_manager: IntermediateManager,
97 ) -> Result<Self> {
98 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
99
100 let local_store = new_fs_cache_store(cache_dir).await?;
101 Self::new(
102 local_store,
103 cache_capacity,
104 ttl,
105 index_cache_percent,
106 puffin_manager_factory,
107 intermediate_manager,
108 )
109 .await
110 }
111
112 pub(crate) fn file_cache(&self) -> FileCacheRef {
114 self.file_cache.clone()
115 }
116
117 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
119 let store = self.file_cache.local_store();
120 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
121 self.puffin_manager_factory.build(store, path_provider)
122 }
123
124 pub(crate) async fn put_and_upload_sst(
126 &self,
127 data: &bytes::Bytes,
128 region_id: RegionId,
129 sst_info: &SstInfo,
130 upload_request: SstUploadRequest,
131 ) -> Result<Metrics> {
132 let file_id = sst_info.file_id;
133 let mut metrics = Metrics::new(WriteType::Flush);
134
135 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
137
138 let cache_start = Instant::now();
140 let cache_path = self.file_cache.cache_file_path(parquet_key);
141 let store = self.file_cache.local_store();
142 let cleaner = TempFileCleaner::new(region_id, store.clone());
143 let write_res = store
144 .write(&cache_path, data.clone())
145 .await
146 .context(crate::error::OpenDalSnafu);
147 if let Err(e) = write_res {
148 cleaner.clean_by_file_id(file_id).await;
149 return Err(e);
150 }
151
152 metrics.write_batch = cache_start.elapsed();
153
154 let upload_start = Instant::now();
156 let region_file_id = RegionFileId::new(region_id, file_id);
157 let remote_path = upload_request
158 .dest_path_provider
159 .build_sst_file_path(region_file_id);
160
161 if let Err(e) = self
162 .upload(parquet_key, &remote_path, &upload_request.remote_store)
163 .await
164 {
165 self.remove(parquet_key).await;
167 return Err(e);
168 }
169
170 metrics.upload_parquet = upload_start.elapsed();
171 Ok(metrics)
172 }
173
174 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
176 &self.intermediate_manager
177 }
178
179 pub(crate) async fn write_and_upload_sst(
181 &self,
182 write_request: SstWriteRequest,
183 upload_request: SstUploadRequest,
184 write_opts: &WriteOptions,
185 metrics: &mut Metrics,
186 ) -> Result<SstInfoArray> {
187 let region_id = write_request.metadata.region_id;
188
189 let store = self.file_cache.local_store();
190 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
191 let indexer = IndexerBuilderImpl {
192 build_type: write_request.op_type.into(),
193 metadata: write_request.metadata.clone(),
194 row_group_size: write_opts.row_group_size,
195 puffin_manager: self
196 .puffin_manager_factory
197 .build(store.clone(), path_provider.clone()),
198 intermediate_manager: self.intermediate_manager.clone(),
199 index_options: write_request.index_options,
200 inverted_index_config: write_request.inverted_index_config,
201 fulltext_index_config: write_request.fulltext_index_config,
202 bloom_filter_index_config: write_request.bloom_filter_index_config,
203 };
204
205 let cleaner = TempFileCleaner::new(region_id, store.clone());
206 let mut writer = ParquetWriter::new_with_object_store(
208 store.clone(),
209 write_request.metadata,
210 write_request.index_config,
211 indexer,
212 path_provider.clone(),
213 metrics,
214 )
215 .await
216 .with_file_cleaner(cleaner);
217
218 let sst_info = match write_request.source {
219 either::Left(source) => {
220 writer
221 .write_all(source, write_request.max_sequence, write_opts)
222 .await?
223 }
224 either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
225 };
226
227 if sst_info.is_empty() {
229 return Ok(sst_info);
230 }
231
232 let mut upload_tracker = UploadTracker::new(region_id);
233 let mut err = None;
234 let remote_store = &upload_request.remote_store;
235 for sst in &sst_info {
236 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
237 let parquet_path = upload_request
238 .dest_path_provider
239 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
240 let start = Instant::now();
241 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
242 err = Some(e);
243 break;
244 }
245 metrics.upload_parquet += start.elapsed();
246 upload_tracker.push_uploaded_file(parquet_path);
247
248 if sst.index_metadata.file_size > 0 {
249 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
250 let puffin_path = upload_request
251 .dest_path_provider
252 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
253 let start = Instant::now();
254 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
255 err = Some(e);
256 break;
257 }
258 metrics.upload_puffin += start.elapsed();
259 upload_tracker.push_uploaded_file(puffin_path);
260 }
261 }
262
263 if let Some(err) = err {
264 upload_tracker
266 .clean(&sst_info, &self.file_cache, remote_store)
267 .await;
268 return Err(err);
269 }
270
271 Ok(sst_info)
272 }
273
274 pub(crate) async fn remove(&self, index_key: IndexKey) {
276 self.file_cache.remove(index_key).await
277 }
278
279 pub(crate) async fn download(
282 &self,
283 index_key: IndexKey,
284 remote_path: &str,
285 remote_store: &ObjectStore,
286 file_size: u64,
287 ) -> Result<()> {
288 self.file_cache
289 .download(index_key, remote_path, remote_store, file_size)
290 .await
291 }
292
293 pub(crate) async fn upload(
295 &self,
296 index_key: IndexKey,
297 upload_path: &str,
298 remote_store: &ObjectStore,
299 ) -> Result<()> {
300 let region_id = index_key.region_id;
301 let file_id = index_key.file_id;
302 let file_type = index_key.file_type;
303 let cache_path = self.file_cache.cache_file_path(index_key);
304
305 let start = Instant::now();
306 let cached_value = self
307 .file_cache
308 .local_store()
309 .stat(&cache_path)
310 .await
311 .context(error::OpenDalSnafu)?;
312 let reader = self
313 .file_cache
314 .local_store()
315 .reader(&cache_path)
316 .await
317 .context(error::OpenDalSnafu)?
318 .into_futures_async_read(0..cached_value.content_length())
319 .await
320 .context(error::OpenDalSnafu)?;
321
322 let mut writer = remote_store
323 .writer_with(upload_path)
324 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
325 .concurrent(DEFAULT_WRITE_CONCURRENCY)
326 .await
327 .context(error::OpenDalSnafu)?
328 .into_futures_async_write();
329
330 let bytes_written =
331 futures::io::copy(reader, &mut writer)
332 .await
333 .context(error::UploadSnafu {
334 region_id,
335 file_id,
336 file_type,
337 })?;
338
339 writer.close().await.context(error::UploadSnafu {
341 region_id,
342 file_id,
343 file_type,
344 })?;
345
346 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
347
348 debug!(
349 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
350 region_id,
351 file_id,
352 upload_path,
353 start.elapsed(),
354 );
355
356 let index_value = IndexValue {
357 file_size: bytes_written as _,
358 };
359 self.file_cache.put(index_key, index_value).await;
361
362 Ok(())
363 }
364
365 pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
369 let _ = self.task_sender.send(task);
370 }
371}
372
373pub struct SstUploadRequest {
375 pub dest_path_provider: RegionFilePathFactory,
377 pub remote_store: ObjectStore,
379}
380
381pub(crate) struct UploadTracker {
383 region_id: RegionId,
385 files_uploaded: Vec<String>,
387}
388
389impl UploadTracker {
390 pub(crate) fn new(region_id: RegionId) -> Self {
392 Self {
393 region_id,
394 files_uploaded: Vec::new(),
395 }
396 }
397
398 pub(crate) fn push_uploaded_file(&mut self, path: String) {
400 self.files_uploaded.push(path);
401 }
402
403 pub(crate) async fn clean(
405 &self,
406 sst_info: &SstInfoArray,
407 file_cache: &FileCacheRef,
408 remote_store: &ObjectStore,
409 ) {
410 common_telemetry::info!(
411 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
412 self.region_id,
413 sst_info.len()
414 );
415
416 for sst in sst_info {
418 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
419 file_cache.remove(parquet_key).await;
420
421 if sst.index_metadata.file_size > 0 {
422 let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
423 file_cache.remove(puffin_key).await;
424 }
425 }
426
427 for file_path in &self.files_uploaded {
429 if let Err(e) = remote_store.delete(file_path).await {
430 common_telemetry::error!(e; "Failed to delete file {}", file_path);
431 }
432 }
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use common_test_util::temp_dir::create_temp_dir;
439 use object_store::ATOMIC_WRITE_DIR;
440 use store_api::region_request::PathType;
441
442 use super::*;
443 use crate::access_layer::OperationType;
444 use crate::cache::test_util::new_fs_store;
445 use crate::cache::{CacheManager, CacheStrategy};
446 use crate::error::InvalidBatchSnafu;
447 use crate::read::Source;
448 use crate::region::options::IndexOptions;
449 use crate::sst::parquet::reader::ParquetReaderBuilder;
450 use crate::test_util::TestEnv;
451 use crate::test_util::sst_util::{
452 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
453 sst_region_metadata,
454 };
455
456 #[tokio::test]
457 async fn test_write_and_upload_sst() {
458 let mut env = TestEnv::new().await;
461 let mock_store = env.init_object_store_manager();
462 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
463
464 let local_dir = create_temp_dir("");
465 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
466
467 let write_cache = env
468 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
469 .await;
470
471 let metadata = Arc::new(sst_region_metadata());
473 let region_id = metadata.region_id;
474 let source = new_source(&[
475 new_batch_by_range(&["a", "d"], 0, 60),
476 new_batch_by_range(&["b", "f"], 0, 40),
477 new_batch_by_range(&["b", "h"], 100, 200),
478 ]);
479
480 let write_request = SstWriteRequest {
481 op_type: OperationType::Flush,
482 metadata,
483 source: either::Left(source),
484 storage: None,
485 max_sequence: None,
486 cache_manager: Default::default(),
487 index_options: IndexOptions::default(),
488 index_config: Default::default(),
489 inverted_index_config: Default::default(),
490 fulltext_index_config: Default::default(),
491 bloom_filter_index_config: Default::default(),
492 };
493
494 let upload_request = SstUploadRequest {
495 dest_path_provider: path_provider.clone(),
496 remote_store: mock_store.clone(),
497 };
498
499 let write_opts = WriteOptions {
500 row_group_size: 512,
501 ..Default::default()
502 };
503
504 let mut metrics = Metrics::new(WriteType::Flush);
506 let mut sst_infos = write_cache
507 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
508 .await
509 .unwrap();
510 let sst_info = sst_infos.remove(0);
511
512 let file_id = sst_info.file_id;
513 let sst_upload_path =
514 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
515 let index_upload_path =
516 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
517
518 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
520 assert!(write_cache.file_cache.contains_key(&key));
521
522 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
524 let cache_data = local_store
525 .read(&write_cache.file_cache.cache_file_path(key))
526 .await
527 .unwrap();
528 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
529
530 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
532 assert!(write_cache.file_cache.contains_key(&index_key));
533
534 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
535 let cache_index_data = local_store
536 .read(&write_cache.file_cache.cache_file_path(index_key))
537 .await
538 .unwrap();
539 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
540
541 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
543 write_cache.remove(sst_index_key).await;
544 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
545 write_cache.remove(index_key).await;
546 assert!(!write_cache.file_cache.contains_key(&index_key));
547 }
548
549 #[tokio::test]
550 async fn test_read_metadata_from_write_cache() {
551 common_telemetry::init_default_ut_logging();
552 let mut env = TestEnv::new().await;
553 let data_home = env.data_home().display().to_string();
554 let mock_store = env.init_object_store_manager();
555
556 let local_dir = create_temp_dir("");
557 let local_path = local_dir.path().to_str().unwrap();
558 let local_store = new_fs_store(local_path);
559
560 let write_cache = env
562 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
563 .await;
564 let cache_manager = Arc::new(
565 CacheManager::builder()
566 .write_cache(Some(write_cache.clone()))
567 .build(),
568 );
569
570 let metadata = Arc::new(sst_region_metadata());
572
573 let source = new_source(&[
574 new_batch_by_range(&["a", "d"], 0, 60),
575 new_batch_by_range(&["b", "f"], 0, 40),
576 new_batch_by_range(&["b", "h"], 100, 200),
577 ]);
578
579 let write_request = SstWriteRequest {
581 op_type: OperationType::Flush,
582 metadata,
583 source: either::Left(source),
584 storage: None,
585 max_sequence: None,
586 cache_manager: cache_manager.clone(),
587 index_options: IndexOptions::default(),
588 index_config: Default::default(),
589 inverted_index_config: Default::default(),
590 fulltext_index_config: Default::default(),
591 bloom_filter_index_config: Default::default(),
592 };
593 let write_opts = WriteOptions {
594 row_group_size: 512,
595 ..Default::default()
596 };
597 let upload_request = SstUploadRequest {
598 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
599 remote_store: mock_store.clone(),
600 };
601
602 let mut metrics = Metrics::new(WriteType::Flush);
603 let mut sst_infos = write_cache
604 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
605 .await
606 .unwrap();
607 let sst_info = sst_infos.remove(0);
608 let write_parquet_metadata = sst_info.file_metadata.unwrap();
609
610 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
612 let builder = ParquetReaderBuilder::new(
613 data_home,
614 PathType::Bare,
615 handle.clone(),
616 mock_store.clone(),
617 )
618 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
619 let reader = builder.build().await.unwrap();
620
621 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
623 }
624
625 #[tokio::test]
626 async fn test_write_cache_clean_tmp_files() {
627 common_telemetry::init_default_ut_logging();
628 let mut env = TestEnv::new().await;
629 let data_home = env.data_home().display().to_string();
630 let mock_store = env.init_object_store_manager();
631
632 let write_cache_dir = create_temp_dir("");
633 let write_cache_path = write_cache_dir.path().to_str().unwrap();
634 let write_cache = env
635 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
636 .await;
637
638 let cache_manager = Arc::new(
640 CacheManager::builder()
641 .write_cache(Some(write_cache.clone()))
642 .build(),
643 );
644
645 let metadata = Arc::new(sst_region_metadata());
647
648 let source = Source::Iter(Box::new(
650 [
651 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
652 InvalidBatchSnafu {
653 reason: "Abort the writer",
654 }
655 .fail(),
656 ]
657 .into_iter(),
658 ));
659
660 let write_request = SstWriteRequest {
662 op_type: OperationType::Flush,
663 metadata,
664 source: either::Left(source),
665 storage: None,
666 max_sequence: None,
667 cache_manager: cache_manager.clone(),
668 index_options: IndexOptions::default(),
669 index_config: Default::default(),
670 inverted_index_config: Default::default(),
671 fulltext_index_config: Default::default(),
672 bloom_filter_index_config: Default::default(),
673 };
674 let write_opts = WriteOptions {
675 row_group_size: 512,
676 ..Default::default()
677 };
678 let upload_request = SstUploadRequest {
679 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
680 remote_store: mock_store.clone(),
681 };
682
683 let mut metrics = Metrics::new(WriteType::Flush);
684 write_cache
685 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
686 .await
687 .unwrap_err();
688 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
689 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
690 let mut has_files = false;
691 while let Some(entry) = entries.next_entry().await.unwrap() {
692 if entry.file_type().await.unwrap().is_dir() {
693 continue;
694 }
695 has_files = true;
696 common_telemetry::warn!(
697 "Found remaining temporary file in atomic dir: {}",
698 entry.path().display()
699 );
700 }
701
702 assert!(!has_files);
703 }
704}