1use std::sync::Arc;
18use std::time::Duration;
19
20use common_base::readable_size::ReadableSize;
21use common_telemetry::{debug, info};
22use futures::AsyncWriteExt;
23use object_store::ObjectStore;
24use snafu::ResultExt;
25use store_api::storage::RegionId;
26
27use crate::access_layer::{
28 new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
29 WriteCachePathProvider,
30};
31use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
32use crate::error::{self, Result};
33use crate::metrics::{
34 FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
35 WRITE_CACHE_DOWNLOAD_ELAPSED,
36};
37use crate::sst::index::intermediate::IntermediateManager;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39use crate::sst::index::IndexerBuilderImpl;
40use crate::sst::parquet::writer::ParquetWriter;
41use crate::sst::parquet::WriteOptions;
42use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
43
44pub struct WriteCache {
48 file_cache: FileCacheRef,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54}
55
56pub type WriteCacheRef = Arc<WriteCache>;
57
58impl WriteCache {
59 pub async fn new(
62 local_store: ObjectStore,
63 cache_capacity: ReadableSize,
64 ttl: Option<Duration>,
65 puffin_manager_factory: PuffinManagerFactory,
66 intermediate_manager: IntermediateManager,
67 ) -> Result<Self> {
68 let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
69 file_cache.recover(false).await;
70
71 Ok(Self {
72 file_cache,
73 puffin_manager_factory,
74 intermediate_manager,
75 })
76 }
77
78 pub async fn new_fs(
80 cache_dir: &str,
81 cache_capacity: ReadableSize,
82 ttl: Option<Duration>,
83 puffin_manager_factory: PuffinManagerFactory,
84 intermediate_manager: IntermediateManager,
85 ) -> Result<Self> {
86 info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
87
88 let local_store = new_fs_cache_store(cache_dir).await?;
89 Self::new(
90 local_store,
91 cache_capacity,
92 ttl,
93 puffin_manager_factory,
94 intermediate_manager,
95 )
96 .await
97 }
98
99 pub(crate) fn file_cache(&self) -> FileCacheRef {
101 self.file_cache.clone()
102 }
103
104 pub(crate) async fn write_and_upload_sst(
106 &self,
107 write_request: SstWriteRequest,
108 upload_request: SstUploadRequest,
109 write_opts: &WriteOptions,
110 ) -> Result<SstInfoArray> {
111 let timer = FLUSH_ELAPSED
112 .with_label_values(&["write_sst"])
113 .start_timer();
114
115 let region_id = write_request.metadata.region_id;
116
117 let store = self.file_cache.local_store();
118 let path_provider = WriteCachePathProvider::new(region_id, self.file_cache.clone());
119 let indexer = IndexerBuilderImpl {
120 op_type: write_request.op_type,
121 metadata: write_request.metadata.clone(),
122 row_group_size: write_opts.row_group_size,
123 puffin_manager: self
124 .puffin_manager_factory
125 .build(store, path_provider.clone()),
126 intermediate_manager: self.intermediate_manager.clone(),
127 index_options: write_request.index_options,
128 inverted_index_config: write_request.inverted_index_config,
129 fulltext_index_config: write_request.fulltext_index_config,
130 bloom_filter_index_config: write_request.bloom_filter_index_config,
131 };
132
133 let mut writer = ParquetWriter::new_with_object_store(
135 self.file_cache.local_store(),
136 write_request.metadata,
137 indexer,
138 path_provider,
139 )
140 .await;
141
142 let sst_info = writer
143 .write_all(write_request.source, write_request.max_sequence, write_opts)
144 .await?;
145
146 timer.stop_and_record();
147
148 if sst_info.is_empty() {
150 return Ok(sst_info);
151 }
152
153 let mut upload_tracker = UploadTracker::new(region_id);
154 let mut err = None;
155 let remote_store = &upload_request.remote_store;
156 for sst in &sst_info {
157 let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
158 let parquet_path = upload_request
159 .dest_path_provider
160 .build_sst_file_path(sst.file_id);
161 if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
162 err = Some(e);
163 break;
164 }
165 upload_tracker.push_uploaded_file(parquet_path);
166
167 if sst.index_metadata.file_size > 0 {
168 let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
169 let puffin_path = upload_request
170 .dest_path_provider
171 .build_index_file_path(sst.file_id);
172 if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
173 err = Some(e);
174 break;
175 }
176 upload_tracker.push_uploaded_file(puffin_path);
177 }
178 }
179
180 if let Some(err) = err {
181 upload_tracker
183 .clean(&sst_info, &self.file_cache, remote_store)
184 .await;
185 return Err(err);
186 }
187
188 Ok(sst_info)
189 }
190
191 pub(crate) async fn remove(&self, index_key: IndexKey) {
193 self.file_cache.remove(index_key).await
194 }
195
196 pub(crate) async fn download(
199 &self,
200 index_key: IndexKey,
201 remote_path: &str,
202 remote_store: &ObjectStore,
203 file_size: u64,
204 ) -> Result<()> {
205 const DOWNLOAD_READER_CONCURRENCY: usize = 8;
206 const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
207
208 let file_type = index_key.file_type;
209 let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
210 .with_label_values(&[match file_type {
211 FileType::Parquet => "download_parquet",
212 FileType::Puffin => "download_puffin",
213 }])
214 .start_timer();
215
216 let reader = remote_store
217 .reader_with(remote_path)
218 .concurrent(DOWNLOAD_READER_CONCURRENCY)
219 .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
220 .await
221 .context(error::OpenDalSnafu)?
222 .into_futures_async_read(0..file_size)
223 .await
224 .context(error::OpenDalSnafu)?;
225
226 let cache_path = self.file_cache.cache_file_path(index_key);
227 let mut writer = self
228 .file_cache
229 .local_store()
230 .writer(&cache_path)
231 .await
232 .context(error::OpenDalSnafu)?
233 .into_futures_async_write();
234
235 let region_id = index_key.region_id;
236 let file_id = index_key.file_id;
237 let bytes_written =
238 futures::io::copy(reader, &mut writer)
239 .await
240 .context(error::DownloadSnafu {
241 region_id,
242 file_id,
243 file_type,
244 })?;
245 writer.close().await.context(error::DownloadSnafu {
246 region_id,
247 file_id,
248 file_type,
249 })?;
250
251 WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
252
253 let elapsed = timer.stop_and_record();
254 debug!(
255 "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
256 remote_path, cache_path, bytes_written, region_id, elapsed,
257 );
258
259 let index_value = IndexValue {
260 file_size: bytes_written as _,
261 };
262 self.file_cache.put(index_key, index_value).await;
263 Ok(())
264 }
265
266 async fn upload(
268 &self,
269 index_key: IndexKey,
270 upload_path: &str,
271 remote_store: &ObjectStore,
272 ) -> Result<()> {
273 let region_id = index_key.region_id;
274 let file_id = index_key.file_id;
275 let file_type = index_key.file_type;
276 let cache_path = self.file_cache.cache_file_path(index_key);
277
278 let timer = FLUSH_ELAPSED
279 .with_label_values(&[match file_type {
280 FileType::Parquet => "upload_parquet",
281 FileType::Puffin => "upload_puffin",
282 }])
283 .start_timer();
284
285 let cached_value = self
286 .file_cache
287 .local_store()
288 .stat(&cache_path)
289 .await
290 .context(error::OpenDalSnafu)?;
291 let reader = self
292 .file_cache
293 .local_store()
294 .reader(&cache_path)
295 .await
296 .context(error::OpenDalSnafu)?
297 .into_futures_async_read(0..cached_value.content_length())
298 .await
299 .context(error::OpenDalSnafu)?;
300
301 let mut writer = remote_store
302 .writer_with(upload_path)
303 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
304 .concurrent(DEFAULT_WRITE_CONCURRENCY)
305 .await
306 .context(error::OpenDalSnafu)?
307 .into_futures_async_write();
308
309 let bytes_written =
310 futures::io::copy(reader, &mut writer)
311 .await
312 .context(error::UploadSnafu {
313 region_id,
314 file_id,
315 file_type,
316 })?;
317
318 writer.close().await.context(error::UploadSnafu {
320 region_id,
321 file_id,
322 file_type,
323 })?;
324
325 UPLOAD_BYTES_TOTAL.inc_by(bytes_written);
326
327 debug!(
328 "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s",
329 region_id,
330 file_id,
331 upload_path,
332 timer.stop_and_record()
333 );
334
335 let index_value = IndexValue {
336 file_size: bytes_written as _,
337 };
338 self.file_cache.put(index_key, index_value).await;
340
341 Ok(())
342 }
343}
344
345pub struct SstUploadRequest {
347 pub dest_path_provider: RegionFilePathFactory,
349 pub remote_store: ObjectStore,
351}
352
353struct UploadTracker {
355 region_id: RegionId,
357 files_uploaded: Vec<String>,
359}
360
361impl UploadTracker {
362 fn new(region_id: RegionId) -> Self {
364 Self {
365 region_id,
366 files_uploaded: Vec::new(),
367 }
368 }
369
370 fn push_uploaded_file(&mut self, path: String) {
372 self.files_uploaded.push(path);
373 }
374
375 async fn clean(
377 &self,
378 sst_info: &SstInfoArray,
379 file_cache: &FileCacheRef,
380 remote_store: &ObjectStore,
381 ) {
382 common_telemetry::info!(
383 "Start cleaning files on upload failure, region: {}, num_ssts: {}",
384 self.region_id,
385 sst_info.len()
386 );
387
388 for sst in sst_info {
390 let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
391 file_cache.remove(parquet_key).await;
392
393 if sst.index_metadata.file_size > 0 {
394 let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
395 file_cache.remove(puffin_key).await;
396 }
397 }
398
399 for file_path in &self.files_uploaded {
401 if let Err(e) = remote_store.delete(file_path).await {
402 common_telemetry::error!(e; "Failed to delete file {}", file_path);
403 }
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use common_test_util::temp_dir::create_temp_dir;
411
412 use super::*;
413 use crate::access_layer::OperationType;
414 use crate::cache::test_util::new_fs_store;
415 use crate::cache::{CacheManager, CacheStrategy};
416 use crate::region::options::IndexOptions;
417 use crate::sst::parquet::reader::ParquetReaderBuilder;
418 use crate::test_util::sst_util::{
419 assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
420 sst_region_metadata,
421 };
422 use crate::test_util::TestEnv;
423
424 #[tokio::test]
425 async fn test_write_and_upload_sst() {
426 let mut env = TestEnv::new();
429 let mock_store = env.init_object_store_manager();
430 let path_provider = RegionFilePathFactory::new("test".to_string());
431
432 let local_dir = create_temp_dir("");
433 let local_store = new_fs_store(local_dir.path().to_str().unwrap());
434
435 let write_cache = env
436 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
437 .await;
438
439 let metadata = Arc::new(sst_region_metadata());
441 let region_id = metadata.region_id;
442 let source = new_source(&[
443 new_batch_by_range(&["a", "d"], 0, 60),
444 new_batch_by_range(&["b", "f"], 0, 40),
445 new_batch_by_range(&["b", "h"], 100, 200),
446 ]);
447
448 let write_request = SstWriteRequest {
449 op_type: OperationType::Flush,
450 metadata,
451 source,
452 storage: None,
453 max_sequence: None,
454 cache_manager: Default::default(),
455 index_options: IndexOptions::default(),
456 inverted_index_config: Default::default(),
457 fulltext_index_config: Default::default(),
458 bloom_filter_index_config: Default::default(),
459 };
460
461 let upload_request = SstUploadRequest {
462 dest_path_provider: path_provider.clone(),
463 remote_store: mock_store.clone(),
464 };
465
466 let write_opts = WriteOptions {
467 row_group_size: 512,
468 ..Default::default()
469 };
470
471 let sst_info = write_cache
473 .write_and_upload_sst(write_request, upload_request, &write_opts)
474 .await
475 .unwrap()
476 .remove(0); let file_id = sst_info.file_id;
479 let sst_upload_path = path_provider.build_sst_file_path(file_id);
480 let index_upload_path = path_provider.build_index_file_path(file_id);
481
482 let key = IndexKey::new(region_id, file_id, FileType::Parquet);
484 assert!(write_cache.file_cache.contains_key(&key));
485
486 let remote_data = mock_store.read(&sst_upload_path).await.unwrap();
488 let cache_data = local_store
489 .read(&write_cache.file_cache.cache_file_path(key))
490 .await
491 .unwrap();
492 assert_eq!(remote_data.to_vec(), cache_data.to_vec());
493
494 let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
496 assert!(write_cache.file_cache.contains_key(&index_key));
497
498 let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
499 let cache_index_data = local_store
500 .read(&write_cache.file_cache.cache_file_path(index_key))
501 .await
502 .unwrap();
503 assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
504
505 let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
507 write_cache.remove(sst_index_key).await;
508 assert!(!write_cache.file_cache.contains_key(&sst_index_key));
509 write_cache.remove(index_key).await;
510 assert!(!write_cache.file_cache.contains_key(&index_key));
511 }
512
513 #[tokio::test]
514 async fn test_read_metadata_from_write_cache() {
515 common_telemetry::init_default_ut_logging();
516 let mut env = TestEnv::new();
517 let data_home = env.data_home().display().to_string();
518 let mock_store = env.init_object_store_manager();
519
520 let local_dir = create_temp_dir("");
521 let local_path = local_dir.path().to_str().unwrap();
522 let local_store = new_fs_store(local_path);
523
524 let write_cache = env
526 .create_write_cache(local_store.clone(), ReadableSize::mb(10))
527 .await;
528 let cache_manager = Arc::new(
529 CacheManager::builder()
530 .write_cache(Some(write_cache.clone()))
531 .build(),
532 );
533
534 let metadata = Arc::new(sst_region_metadata());
536
537 let source = new_source(&[
538 new_batch_by_range(&["a", "d"], 0, 60),
539 new_batch_by_range(&["b", "f"], 0, 40),
540 new_batch_by_range(&["b", "h"], 100, 200),
541 ]);
542
543 let write_request = SstWriteRequest {
545 op_type: OperationType::Flush,
546 metadata,
547 source,
548 storage: None,
549 max_sequence: None,
550 cache_manager: cache_manager.clone(),
551 index_options: IndexOptions::default(),
552 inverted_index_config: Default::default(),
553 fulltext_index_config: Default::default(),
554 bloom_filter_index_config: Default::default(),
555 };
556 let write_opts = WriteOptions {
557 row_group_size: 512,
558 ..Default::default()
559 };
560 let upload_request = SstUploadRequest {
561 dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
562 remote_store: mock_store.clone(),
563 };
564
565 let sst_info = write_cache
566 .write_and_upload_sst(write_request, upload_request, &write_opts)
567 .await
568 .unwrap()
569 .remove(0);
570 let write_parquet_metadata = sst_info.file_metadata.unwrap();
571
572 let handle = sst_file_handle_with_file_id(sst_info.file_id, 0, 1000);
574 let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
575 .cache(CacheStrategy::EnableAll(cache_manager.clone()));
576 let reader = builder.build().await.unwrap();
577
578 assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
580 }
581}