1use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27
28use crate::access_layer::{
29 FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
30 TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
31};
32use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
33use crate::cache::manifest_cache::ManifestCache;
34use crate::error::{self, Result};
35use crate::metrics::UPLOAD_BYTES_TOTAL;
36use crate::region::opener::RegionLoadCacheTask;
37use crate::sst::file::RegionFileId;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::index::intermediate::IntermediateManager;
40use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
44
45pub struct WriteCache {
49 file_cache: FileCacheRef,
51 puffin_manager_factory: PuffinManagerFactory,
53 intermediate_manager: IntermediateManager,
55 task_sender: UnboundedSender<RegionLoadCacheTask>,
57 manifest_cache: Option<ManifestCache>,
59}
60
61pub type WriteCacheRef = Arc<WriteCache>;
62
63impl WriteCache {
64 pub async fn new(
67 local_store: ObjectStore,
68 cache_capacity: ReadableSize,
69 ttl: Option<Duration>,
70 index_cache_percent: Option<u8>,
71 puffin_manager_factory: PuffinManagerFactory,
72 intermediate_manager: IntermediateManager,
73 manifest_cache: Option<ManifestCache>,
74 ) -> Result<Self> {
75 let (task_sender, task_receiver) = unbounded_channel();
76
77 let file_cache = Arc::new(FileCache::new(
78 local_store,
79 cache_capacity,
80 ttl,
81 index_cache_percent,
82 ));
83 file_cache.recover(false, Some(task_receiver)).await;
84
85 Ok(Self {
86 file_cache,
87 puffin_manager_factory,
88 intermediate_manager,
89 task_sender,
90 manifest_cache,
91 })
92 }
93
94 pub async fn new_fs(
96 cache_dir: &str,
97 cache_capacity: ReadableSize,
98 ttl: Option<Duration>,
99 index_cache_percent: Option<u8>,
100 puffin_manager_factory: PuffinManagerFactory,
101 intermediate_manager: IntermediateManager,
102 manifest_cache_capacity: ReadableSize,
103 ) -> Result<Self> {
104 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
105
106 let local_store = new_fs_cache_store(cache_dir).await?;
107
108 let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
110 Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl).await)
111 } else {
112 None
113 };
114
115 Self::new(
116 local_store,
117 cache_capacity,
118 ttl,
119 index_cache_percent,
120 puffin_manager_factory,
121 intermediate_manager,
122 manifest_cache,
123 )
124 .await
125 }
126
127 pub(crate) fn file_cache(&self) -> FileCacheRef {
129 self.file_cache.clone()
130 }
131
132 pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
134 self.manifest_cache.clone()
135 }
136
137 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
139 let store = self.file_cache.local_store();
140 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
141 self.puffin_manager_factory.build(store, path_provider)
142 }
143
144 pub(crate) async fn put_and_upload_sst(
146 &self,
147 data: &bytes::Bytes,
148 region_id: RegionId,
149 sst_info: &SstInfo,
150 upload_request: SstUploadRequest,
151 ) -> Result<Metrics> {
152 let file_id = sst_info.file_id;
153 let mut metrics = Metrics::new(WriteType::Flush);
154
155 let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
157
158 let cache_start = Instant::now();
160 let cache_path = self.file_cache.cache_file_path(parquet_key);
161 let store = self.file_cache.local_store();
162 let cleaner = TempFileCleaner::new(region_id, store.clone());
163 let write_res = store
164 .write(&cache_path, data.clone())
165 .await
166 .context(crate::error::OpenDalSnafu);
167 if let Err(e) = write_res {
168 cleaner.clean_by_file_id(file_id).await;
169 return Err(e);
170 }
171
172 metrics.write_batch = cache_start.elapsed();
173
174 let upload_start = Instant::now();
176 let region_file_id = RegionFileId::new(region_id, file_id);
177 let remote_path = upload_request
178 .dest_path_provider
179 .build_sst_file_path(region_file_id);
180
181 if let Err(e) = self
182 .upload(parquet_key, &remote_path, &upload_request.remote_store)
183 .await
184 {
185 self.remove(parquet_key).await;
187 return Err(e);
188 }
189
190 metrics.upload_parquet = upload_start.elapsed();
191 Ok(metrics)
192 }
193
194 pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
196 &self.intermediate_manager
197 }
198
199 pub(crate) async fn write_and_upload_sst(
201 &self,
202 write_request: SstWriteRequest,
203 upload_request: SstUploadRequest,
204 write_opts: &WriteOptions,
205 metrics: &mut Metrics,
206 ) -> Result<SstInfoArray> {
207 let region_id = write_request.metadata.region_id;
208
209 let store = self.file_cache.local_store();
210 let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
211 let indexer = IndexerBuilderImpl {
212 build_type: write_request.op_type.into(),
213 metadata: write_request.metadata.clone(),
214 row_group_size: write_opts.row_group_size,
215 puffin_manager: self
216 .puffin_manager_factory
217 .build(store.clone(), path_provider.clone()),
218 write_cache_enabled: true,
219 intermediate_manager: self.intermediate_manager.clone(),
220 index_options: write_request.index_options,
221 inverted_index_config: write_request.inverted_index_config,
222 fulltext_index_config: write_request.fulltext_index_config,
223 bloom_filter_index_config: write_request.bloom_filter_index_config,
224 };
225
226 let cleaner = TempFileCleaner::new(region_id, store.clone());
227 let mut writer = ParquetWriter::new_with_object_store(
229 store.clone(),
230 write_request.metadata,
231 write_request.index_config,
232 indexer,
233 path_provider.clone(),
234 metrics,
235 )
236 .await
237 .with_file_cleaner(cleaner);
238
239 let sst_info = match write_request.source {
240 either::Left(source) => {
241 writer
242 .write_all(source, write_request.max_sequence, write_opts)
243 .await?
244 }
245 either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
246 };
247
248 if sst_info.is_empty() {
250 return Ok(sst_info);
251 }
252
253 let mut upload_tracker = UploadTracker::new(region_id);
254 let mut err = None;
255 let remote_store = &upload_request.remote_store;
256 for sst in &sst_info {
257 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
258 let parquet_path = upload_request
259 .dest_path_provider
260 .build_sst_file_path(RegionFileId::new(region_id, sst.file_id));
261 let start = Instant::now();
262 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
263 err = Some(e);
264 break;
265 }
266 metrics.upload_parquet += start.elapsed();
267 upload_tracker.push_uploaded_file(parquet_path);
268
269 if sst.index_metadata.file_size > 0 {
270 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin(0));
271 let puffin_path = upload_request
272 .dest_path_provider
273 .build_index_file_path(RegionFileId::new(region_id, sst.file_id));
274 let start = Instant::now();
275 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
276 err = Some(e);
277 break;
278 }
279 metrics.upload_puffin += start.elapsed();
280 upload_tracker.push_uploaded_file(puffin_path);
281 }
282 }
283
284 if let Some(err) = err {
285 upload_tracker
287 .clean(&sst_info, &self.file_cache, remote_store)
288 .await;
289 return Err(err);
290 }
291
292 Ok(sst_info)
293 }
294
295 pub(crate) async fn remove(&self, index_key: IndexKey) {
297 self.file_cache.remove(index_key).await
298 }
299
300 pub(crate) async fn download(
303 &self,
304 index_key: IndexKey,
305 remote_path: &str,
306 remote_store: &ObjectStore,
307 file_size: u64,
308 ) -> Result<()> {
309 self.file_cache
310 .download(index_key, remote_path, remote_store, file_size)
311 .await
312 }
313
314 pub(crate) async fn upload(
316 &self,
317 index_key: IndexKey,
318 upload_path: &str,
319 remote_store: &ObjectStore,
320 ) -> Result<()> {
321 let region_id = index_key.region_id;
322 let file_id = index_key.file_id;
323 let file_type = index_key.file_type;
324 let cache_path = self.file_cache.cache_file_path(index_key);
325
326 let start = Instant::now();
327 let cached_value = self
328 .file_cache
329 .local_store()
330 .stat(&cache_path)
331 .await
332 .context(error::OpenDalSnafu)?;
333 let reader = self
334 .file_cache
335 .local_store()
336 .reader(&cache_path)
337 .await
338 .context(error::OpenDalSnafu)?
339 .into_futures_async_read(0..cached_value.content_length())
340 .await
341 .context(error::OpenDalSnafu)?;
342
343 let mut writer = remote_store
344 .writer_with(upload_path)
345 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
346 .concurrent(DEFAULT_WRITE_CONCURRENCY)
347 .await
348 .context(error::OpenDalSnafu)?
349 .into_futures_async_write();
350
351 let bytes_written =
352 futures::io::copy(reader, &mut writer)
353 .await
354 .context(error::UploadSnafu {
355 region_id,
356 file_id,
357 file_type,
358 })?;
359
360 writer.close().await.context(error::UploadSnafu {
362 region_id,
363 file_id,
364 file_type,
365 })?;
366
367 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
368
369 debug!(
370 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}",
371 region_id,
372 file_id,
373 upload_path,
374 start.elapsed(),
375 );
376
377 let index_value = IndexValue {
378 file_size: bytes_written as _,
379 };
380 self.file_cache.put(index_key, index_value).await;
382
383 Ok(())
384 }
385
386 pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
390 let _ = self.task_sender.send(task);
391 }
392}
393
394pub struct SstUploadRequest {
396 pub dest_path_provider: RegionFilePathFactory,
398 pub remote_store: ObjectStore,
400}
401
402pub(crate) struct UploadTracker {
404 region_id: RegionId,
406 files_uploaded: Vec<String>,
408}
409
410impl UploadTracker {
411 pub(crate) fn new(region_id: RegionId) -> Self {
413 Self {
414 region_id,
415 files_uploaded: Vec::new(),
416 }
417 }
418
419 pub(crate) fn push_uploaded_file(&mut self, path: String) {
421 self.files_uploaded.push(path);
422 }
423
424 pub(crate) async fn clean(
426 &self,
427 sst_info: &SstInfoArray,
428 file_cache: &FileCacheRef,
429 remote_store: &ObjectStore,
430 ) {
431 common_telemetry::info!(
432 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
433 self.region_id,
434 sst_info.len()
435 );
436
437 for sst in sst_info {
439 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
440 file_cache.remove(parquet_key).await;
441
442 if sst.index_metadata.file_size > 0 {
443 let puffin_key = IndexKey::new(
444 self.region_id,
445 sst.file_id,
446 FileType::Puffin(sst.index_metadata.version),
447 );
448 file_cache.remove(puffin_key).await;
449 }
450 }
451
452 for file_path in &self.files_uploaded {
454 if let Err(e) = remote_store.delete(file_path).await {
455 common_telemetry::error!(e; "Failed to delete file {}", file_path);
456 }
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use common_test_util::temp_dir::create_temp_dir;
464 use object_store::ATOMIC_WRITE_DIR;
465 use store_api::region_request::PathType;
466
467 use super::*;
468 use crate::access_layer::OperationType;
469 use crate::cache::test_util::new_fs_store;
470 use crate::cache::{CacheManager, CacheStrategy};
471 use crate::error::InvalidBatchSnafu;
472 use crate::read::Source;
473 use crate::region::options::IndexOptions;
474 use crate::sst::parquet::reader::ParquetReaderBuilder;
475 use crate::test_util::TestEnv;
476 use crate::test_util::sst_util::{
477 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
478 sst_region_metadata,
479 };
480
481 #[tokio::test]
482 async fn test_write_and_upload_sst() {
483 let mut env = TestEnv::new().await;
486 let mock_store = env.init_object_store_manager();
487 let path_provider = RegionFilePathFactory::new("test".to_string(), PathType::Bare);
488
489 let local_dir = create_temp_dir("");
490 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
491
492 let write_cache = env
493 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
494 .await;
495
496 let metadata = Arc::new(sst_region_metadata());
498 let region_id = metadata.region_id;
499 let source = new_source(&[
500 new_batch_by_range(&["a", "d"], 0, 60),
501 new_batch_by_range(&["b", "f"], 0, 40),
502 new_batch_by_range(&["b", "h"], 100, 200),
503 ]);
504
505 let write_request = SstWriteRequest {
506 op_type: OperationType::Flush,
507 metadata,
508 source: either::Left(source),
509 storage: None,
510 max_sequence: None,
511 cache_manager: Default::default(),
512 index_options: IndexOptions::default(),
513 index_config: Default::default(),
514 inverted_index_config: Default::default(),
515 fulltext_index_config: Default::default(),
516 bloom_filter_index_config: Default::default(),
517 };
518
519 let upload_request = SstUploadRequest {
520 dest_path_provider: path_provider.clone(),
521 remote_store: mock_store.clone(),
522 };
523
524 let write_opts = WriteOptions {
525 row_group_size: 512,
526 ..Default::default()
527 };
528
529 let mut metrics = Metrics::new(WriteType::Flush);
531 let mut sst_infos = write_cache
532 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
533 .await
534 .unwrap();
535 let sst_info = sst_infos.remove(0);
536
537 let file_id = sst_info.file_id;
538 let sst_upload_path =
539 path_provider.build_sst_file_path(RegionFileId::new(region_id, file_id));
540 let index_upload_path =
541 path_provider.build_index_file_path(RegionFileId::new(region_id, file_id));
542
543 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
545 assert!(write_cache.file_cache.contains_key(&key));
546
547 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
549 let cache_data = local_store
550 .read(&write_cache.file_cache.cache_file_path(key))
551 .await
552 .unwrap();
553 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
554
555 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin(0));
557 assert!(write_cache.file_cache.contains_key(&index_key));
558
559 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
560 let cache_index_data = local_store
561 .read(&write_cache.file_cache.cache_file_path(index_key))
562 .await
563 .unwrap();
564 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
565
566 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
568 write_cache.remove(sst_index_key).await;
569 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
570 write_cache.remove(index_key).await;
571 assert!(!write_cache.file_cache.contains_key(&index_key));
572 }
573
574 #[tokio::test]
575 async fn test_read_metadata_from_write_cache() {
576 common_telemetry::init_default_ut_logging();
577 let mut env = TestEnv::new().await;
578 let data_home = env.data_home().display().to_string();
579 let mock_store = env.init_object_store_manager();
580
581 let local_dir = create_temp_dir("");
582 let local_path = local_dir.path().to_str().unwrap();
583 let local_store = new_fs_store(local_path);
584
585 let write_cache = env
587 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
588 .await;
589 let cache_manager = Arc::new(
590 CacheManager::builder()
591 .write_cache(Some(write_cache.clone()))
592 .build(),
593 );
594
595 let metadata = Arc::new(sst_region_metadata());
597
598 let source = new_source(&[
599 new_batch_by_range(&["a", "d"], 0, 60),
600 new_batch_by_range(&["b", "f"], 0, 40),
601 new_batch_by_range(&["b", "h"], 100, 200),
602 ]);
603
604 let write_request = SstWriteRequest {
606 op_type: OperationType::Flush,
607 metadata,
608 source: either::Left(source),
609 storage: None,
610 max_sequence: None,
611 cache_manager: cache_manager.clone(),
612 index_options: IndexOptions::default(),
613 index_config: Default::default(),
614 inverted_index_config: Default::default(),
615 fulltext_index_config: Default::default(),
616 bloom_filter_index_config: Default::default(),
617 };
618 let write_opts = WriteOptions {
619 row_group_size: 512,
620 ..Default::default()
621 };
622 let upload_request = SstUploadRequest {
623 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
624 remote_store: mock_store.clone(),
625 };
626
627 let mut metrics = Metrics::new(WriteType::Flush);
628 let mut sst_infos = write_cache
629 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
630 .await
631 .unwrap();
632 let sst_info = sst_infos.remove(0);
633 let write_parquet_metadata = sst_info.file_metadata.unwrap();
634
635 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
637 let builder = ParquetReaderBuilder::new(
638 data_home,
639 PathType::Bare,
640 handle.clone(),
641 mock_store.clone(),
642 )
643 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
644 let reader = builder.build().await.unwrap();
645
646 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
648 }
649
650 #[tokio::test]
651 async fn test_write_cache_clean_tmp_files() {
652 common_telemetry::init_default_ut_logging();
653 let mut env = TestEnv::new().await;
654 let data_home = env.data_home().display().to_string();
655 let mock_store = env.init_object_store_manager();
656
657 let write_cache_dir = create_temp_dir("");
658 let write_cache_path = write_cache_dir.path().to_str().unwrap();
659 let write_cache = env
660 .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
661 .await;
662
663 let cache_manager = Arc::new(
665 CacheManager::builder()
666 .write_cache(Some(write_cache.clone()))
667 .build(),
668 );
669
670 let metadata = Arc::new(sst_region_metadata());
672
673 let source = Source::Iter(Box::new(
675 [
676 Ok(new_batch_by_range(&["a", "d"], 0, 60)),
677 InvalidBatchSnafu {
678 reason: "Abort the writer",
679 }
680 .fail(),
681 ]
682 .into_iter(),
683 ));
684
685 let write_request = SstWriteRequest {
687 op_type: OperationType::Flush,
688 metadata,
689 source: either::Left(source),
690 storage: None,
691 max_sequence: None,
692 cache_manager: cache_manager.clone(),
693 index_options: IndexOptions::default(),
694 index_config: Default::default(),
695 inverted_index_config: Default::default(),
696 fulltext_index_config: Default::default(),
697 bloom_filter_index_config: Default::default(),
698 };
699 let write_opts = WriteOptions {
700 row_group_size: 512,
701 ..Default::default()
702 };
703 let upload_request = SstUploadRequest {
704 dest_path_provider: RegionFilePathFactory::new(data_home.clone(), PathType::Bare),
705 remote_store: mock_store.clone(),
706 };
707
708 let mut metrics = Metrics::new(WriteType::Flush);
709 write_cache
710 .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
711 .await
712 .unwrap_err();
713 let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
714 let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
715 let mut has_files = false;
716 while let Some(entry) = entries.next_entry().await.unwrap() {
717 if entry.file_type().await.unwrap().is_dir() {
718 continue;
719 }
720 has_files = true;
721 common_telemetry::warn!(
722 "Found remaining temporary file in atomic dir: {}",
723 entry.path().display()
724 );
725 }
726
727 assert!(!has_files);
728 }
729}