1use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use async_stream::try_stream;
19use common_time::Timestamp;
20use futures::{Stream, TryStreamExt};
21use object_store::services::Fs;
22use object_store::util::{join_dir, with_instrument_layers};
23use object_store::{ATOMIC_WRITE_DIR, ErrorKind, OLD_ATOMIC_WRITE_DIR, ObjectStore};
24use smallvec::SmallVec;
25use snafu::ResultExt;
26use store_api::metadata::RegionMetadataRef;
27use store_api::region_request::PathType;
28use store_api::sst_entry::StorageSstEntry;
29use store_api::storage::{FileId, RegionId, SequenceNumber};
30
31use crate::cache::CacheManagerRef;
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::write_cache::SstUploadRequest;
34use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
35use crate::error::{
36 CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result,
37};
38use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
39use crate::read::FlatSource;
40use crate::region::options::IndexOptions;
41use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
42use crate::sst::index::IndexerBuilderImpl;
43use crate::sst::index::intermediate::IntermediateManager;
44use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
45use crate::sst::location::{self, region_dir_from_table_dir};
46use crate::sst::parquet::reader::ParquetReaderBuilder;
47use crate::sst::parquet::writer::ParquetWriter;
48use crate::sst::parquet::{SstInfo, WriteOptions};
49use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FormatType};
50
51pub type AccessLayerRef = Arc<AccessLayer>;
52pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
54
55#[derive(Eq, PartialEq, Debug)]
57pub enum WriteType {
58 Flush,
60 Compaction,
62}
63
64#[derive(Debug)]
65pub struct Metrics {
66 pub(crate) write_type: WriteType,
67 pub(crate) iter_source: Duration,
68 pub(crate) write_batch: Duration,
69 pub(crate) update_index: Duration,
70 pub(crate) upload_parquet: Duration,
71 pub(crate) upload_puffin: Duration,
72 pub(crate) compact_memtable: Duration,
73}
74
75impl Metrics {
76 pub fn new(write_type: WriteType) -> Self {
77 Self {
78 write_type,
79 iter_source: Default::default(),
80 write_batch: Default::default(),
81 update_index: Default::default(),
82 upload_parquet: Default::default(),
83 upload_puffin: Default::default(),
84 compact_memtable: Default::default(),
85 }
86 }
87
88 pub(crate) fn merge(mut self, other: Self) -> Self {
89 assert_eq!(self.write_type, other.write_type);
90 self.iter_source += other.iter_source;
91 self.write_batch += other.write_batch;
92 self.update_index += other.update_index;
93 self.upload_parquet += other.upload_parquet;
94 self.upload_puffin += other.upload_puffin;
95 self.compact_memtable += other.compact_memtable;
96 self
97 }
98
99 pub(crate) fn observe(self) {
100 match self.write_type {
101 WriteType::Flush => {
102 FLUSH_ELAPSED
103 .with_label_values(&["iter_source"])
104 .observe(self.iter_source.as_secs_f64());
105 FLUSH_ELAPSED
106 .with_label_values(&["write_batch"])
107 .observe(self.write_batch.as_secs_f64());
108 FLUSH_ELAPSED
109 .with_label_values(&["update_index"])
110 .observe(self.update_index.as_secs_f64());
111 FLUSH_ELAPSED
112 .with_label_values(&["upload_parquet"])
113 .observe(self.upload_parquet.as_secs_f64());
114 FLUSH_ELAPSED
115 .with_label_values(&["upload_puffin"])
116 .observe(self.upload_puffin.as_secs_f64());
117 if !self.compact_memtable.is_zero() {
118 FLUSH_ELAPSED
119 .with_label_values(&["compact_memtable"])
120 .observe(self.upload_puffin.as_secs_f64());
121 }
122 }
123 WriteType::Compaction => {
124 COMPACTION_STAGE_ELAPSED
125 .with_label_values(&["iter_source"])
126 .observe(self.iter_source.as_secs_f64());
127 COMPACTION_STAGE_ELAPSED
128 .with_label_values(&["write_batch"])
129 .observe(self.write_batch.as_secs_f64());
130 COMPACTION_STAGE_ELAPSED
131 .with_label_values(&["update_index"])
132 .observe(self.update_index.as_secs_f64());
133 COMPACTION_STAGE_ELAPSED
134 .with_label_values(&["upload_parquet"])
135 .observe(self.upload_parquet.as_secs_f64());
136 COMPACTION_STAGE_ELAPSED
137 .with_label_values(&["upload_puffin"])
138 .observe(self.upload_puffin.as_secs_f64());
139 }
140 };
141 }
142}
143
144pub struct AccessLayer {
146 table_dir: String,
147 path_type: PathType,
149 object_store: ObjectStore,
151 puffin_manager_factory: PuffinManagerFactory,
153 intermediate_manager: IntermediateManager,
155}
156
157impl std::fmt::Debug for AccessLayer {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("AccessLayer")
160 .field("table_dir", &self.table_dir)
161 .finish()
162 }
163}
164
165impl AccessLayer {
166 pub fn new(
168 table_dir: impl Into<String>,
169 path_type: PathType,
170 object_store: ObjectStore,
171 puffin_manager_factory: PuffinManagerFactory,
172 intermediate_manager: IntermediateManager,
173 ) -> AccessLayer {
174 AccessLayer {
175 table_dir: table_dir.into(),
176 path_type,
177 object_store,
178 puffin_manager_factory,
179 intermediate_manager,
180 }
181 }
182
183 pub fn table_dir(&self) -> &str {
185 &self.table_dir
186 }
187
188 pub fn object_store(&self) -> &ObjectStore {
190 &self.object_store
191 }
192
193 pub fn path_type(&self) -> PathType {
195 self.path_type
196 }
197
198 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
200 &self.puffin_manager_factory
201 }
202
203 pub fn intermediate_manager(&self) -> &IntermediateManager {
205 &self.intermediate_manager
206 }
207
208 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
210 let store = self.object_store.clone();
211 let path_provider =
212 RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type());
213 self.puffin_manager_factory.build(store, path_provider)
214 }
215
216 pub(crate) async fn delete_index(
217 &self,
218 index_file_id: RegionIndexId,
219 ) -> Result<(), crate::error::Error> {
220 let path = location::index_file_path(
221 &self.table_dir,
222 RegionIndexId::new(index_file_id.file_id, index_file_id.version),
223 self.path_type,
224 );
225 self.object_store
226 .delete(&path)
227 .await
228 .context(DeleteIndexSnafu {
229 file_id: index_file_id.file_id(),
230 })?;
231 Ok(())
232 }
233
234 pub(crate) async fn delete_ssts(
235 &self,
236 region_id: RegionId,
237 file_ids: &[FileId],
238 ) -> Result<(), crate::error::Error> {
239 if file_ids.is_empty() {
240 return Ok(());
241 }
242
243 let attempted_files = file_ids.to_vec();
244 let paths: Vec<_> = file_ids
245 .iter()
246 .map(|file_id| {
247 location::sst_file_path(
248 &self.table_dir,
249 RegionFileId::new(region_id, *file_id),
250 self.path_type,
251 )
252 })
253 .collect();
254
255 let mut deleter = self
256 .object_store
257 .deleter()
258 .await
259 .with_context(|_| DeleteSstsSnafu {
260 region_id,
261 file_ids: attempted_files.clone(),
262 })?;
263 deleter
264 .delete_iter(paths.iter().map(String::as_str))
265 .await
266 .with_context(|_| DeleteSstsSnafu {
267 region_id,
268 file_ids: attempted_files.clone(),
269 })?;
270 deleter.close().await.with_context(|_| DeleteSstsSnafu {
271 region_id,
272 file_ids: attempted_files,
273 })?;
274
275 Ok(())
276 }
277
278 pub(crate) async fn delete_indexes(
279 &self,
280 index_ids: &[RegionIndexId],
281 ) -> Result<(), crate::error::Error> {
282 if index_ids.is_empty() {
283 return Ok(());
284 }
285
286 let file_ids: Vec<_> = index_ids
287 .iter()
288 .map(|index_id| index_id.file_id())
289 .collect();
290 let paths: Vec<_> = index_ids
291 .iter()
292 .map(|index_id| location::index_file_path(&self.table_dir, *index_id, self.path_type))
293 .collect();
294
295 let mut deleter = self
296 .object_store
297 .deleter()
298 .await
299 .context(DeleteIndexesSnafu {
300 file_ids: file_ids.clone(),
301 })?;
302 deleter
303 .delete_iter(paths.iter().map(String::as_str))
304 .await
305 .context(DeleteIndexesSnafu {
306 file_ids: file_ids.clone(),
307 })?;
308 deleter
309 .close()
310 .await
311 .context(DeleteIndexesSnafu { file_ids })?;
312
313 Ok(())
314 }
315
316 pub fn build_region_dir(&self, region_id: RegionId) -> String {
318 region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
319 }
320
321 pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
323 ParquetReaderBuilder::new(
324 self.table_dir.clone(),
325 self.path_type,
326 file,
327 self.object_store.clone(),
328 )
329 }
330
331 pub async fn write_sst(
335 &self,
336 request: SstWriteRequest,
337 write_opts: &WriteOptions,
338 metrics: &mut Metrics,
339 ) -> Result<SstInfoArray> {
340 let region_id = request.metadata.region_id;
341 let region_metadata = request.metadata.clone();
342 let cache_manager = request.cache_manager.clone();
343
344 let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
345 write_cache
347 .write_and_upload_sst(
348 request,
349 SstUploadRequest {
350 dest_path_provider: RegionFilePathFactory::new(
351 self.table_dir.clone(),
352 self.path_type,
353 ),
354 remote_store: self.object_store.clone(),
355 },
356 write_opts,
357 metrics,
358 )
359 .await?
360 } else {
361 let store = self.object_store.clone();
363 let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
364 let indexer_builder = IndexerBuilderImpl {
365 build_type: request.op_type.into(),
366 metadata: request.metadata.clone(),
367 row_group_size: write_opts.row_group_size,
368 puffin_manager: self
369 .puffin_manager_factory
370 .build(store, path_provider.clone()),
371 write_cache_enabled: false,
372 intermediate_manager: self.intermediate_manager.clone(),
373 index_options: request.index_options,
374 inverted_index_config: request.inverted_index_config,
375 fulltext_index_config: request.fulltext_index_config,
376 bloom_filter_index_config: request.bloom_filter_index_config,
377 #[cfg(feature = "vector_index")]
378 vector_index_config: request.vector_index_config,
379 };
380 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
384 let mut writer = ParquetWriter::new_with_object_store(
385 self.object_store.clone(),
386 request.metadata,
387 request.index_config,
388 indexer_builder,
389 path_provider,
390 metrics,
391 )
392 .await
393 .with_file_cleaner(cleaner);
394 match request.sst_write_format {
395 FormatType::PrimaryKey => {
396 writer
397 .write_all_flat_as_primary_key(
398 request.source,
399 request.max_sequence,
400 write_opts,
401 )
402 .await?
403 }
404 FormatType::Flat => {
405 writer
406 .write_all_flat(request.source, request.max_sequence, write_opts)
407 .await?
408 }
409 }
410 };
411
412 if !sst_info.is_empty() {
414 for sst in &sst_info {
415 if let Some(parquet_metadata) = &sst.file_metadata {
416 cache_manager.put_parquet_meta_data(
417 RegionFileId::new(region_id, sst.file_id),
418 parquet_metadata.clone(),
419 Some(region_metadata.clone()),
420 )
421 }
422 }
423 }
424
425 Ok(sst_info)
426 }
427
428 pub(crate) async fn put_sst(
430 &self,
431 data: &bytes::Bytes,
432 region_id: RegionId,
433 sst_info: &SstInfo,
434 cache_manager: &CacheManagerRef,
435 ) -> Result<Metrics> {
436 if let Some(write_cache) = cache_manager.write_cache() {
437 let upload_request = SstUploadRequest {
439 dest_path_provider: RegionFilePathFactory::new(
440 self.table_dir.clone(),
441 self.path_type,
442 ),
443 remote_store: self.object_store.clone(),
444 };
445 write_cache
446 .put_and_upload_sst(data, region_id, sst_info, upload_request)
447 .await
448 } else {
449 let start = Instant::now();
450 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
451 let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
452 let sst_file_path =
453 path_provider.build_sst_file_path(RegionFileId::new(region_id, sst_info.file_id));
454 let mut writer = self
455 .object_store
456 .writer_with(&sst_file_path)
457 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
458 .concurrent(DEFAULT_WRITE_CONCURRENCY)
459 .await
460 .context(OpenDalSnafu)?;
461 if let Err(err) = writer.write(data.clone()).await.context(OpenDalSnafu) {
462 cleaner.clean_by_file_id(sst_info.file_id).await;
463 return Err(err);
464 }
465 if let Err(err) = writer.close().await.context(OpenDalSnafu) {
466 cleaner.clean_by_file_id(sst_info.file_id).await;
467 return Err(err);
468 }
469 let mut metrics = Metrics::new(WriteType::Flush);
470 metrics.write_batch = start.elapsed();
471 Ok(metrics)
472 }
473 }
474
475 pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> + use<> {
477 let object_store = self.object_store.clone();
478 let table_dir = self.table_dir.clone();
479
480 try_stream! {
481 let mut lister = object_store
482 .lister_with(table_dir.as_str())
483 .recursive(true)
484 .await
485 .context(OpenDalSnafu)?;
486
487 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
488 let metadata = entry.metadata();
489 if metadata.is_dir() {
490 continue;
491 }
492
493 let path = entry.path();
494 if !path.ends_with(".parquet") && !path.ends_with(".puffin") {
495 continue;
496 }
497
498 let file_size = metadata.content_length();
499 let file_size = if file_size == 0 { None } else { Some(file_size) };
500 let last_modified_ms = metadata
501 .last_modified()
502 .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
503
504 let entry = StorageSstEntry {
505 file_path: path.to_string(),
506 file_size,
507 last_modified_ms,
508 node_id: None,
509 };
510
511 yield entry;
512 }
513 }
514 }
515}
516
517#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
519pub enum OperationType {
520 Flush,
521 Compact,
522}
523
524pub struct SstWriteRequest {
526 pub op_type: OperationType,
527 pub metadata: RegionMetadataRef,
528 pub source: FlatSource,
529 pub cache_manager: CacheManagerRef,
530 #[allow(dead_code)]
531 pub storage: Option<String>,
532 pub max_sequence: Option<SequenceNumber>,
533 pub sst_write_format: FormatType,
534
535 pub index_options: IndexOptions,
537 pub index_config: IndexConfig,
538 pub inverted_index_config: InvertedIndexConfig,
539 pub fulltext_index_config: FulltextIndexConfig,
540 pub bloom_filter_index_config: BloomFilterConfig,
541 #[cfg(feature = "vector_index")]
542 pub vector_index_config: crate::config::VectorIndexConfig,
543}
544
545pub(crate) struct TempFileCleaner {
547 region_id: RegionId,
548 object_store: ObjectStore,
549}
550
551impl TempFileCleaner {
552 pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
554 Self {
555 region_id,
556 object_store,
557 }
558 }
559
560 pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
563 let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
564 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
565
566 Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
567 }
568
569 pub(crate) async fn clean_atomic_dir_files(
571 local_store: &ObjectStore,
572 names_to_remove: &[&str],
573 ) {
574 let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
577 if e.kind() != ErrorKind::NotFound {
578 common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
579 }
580 }) else {
581 return;
582 };
583
584 let actual_files: Vec<_> = entries
587 .into_iter()
588 .filter_map(|entry| {
589 if entry.metadata().is_dir() {
590 return None;
591 }
592
593 let should_remove = names_to_remove
595 .iter()
596 .any(|file| entry.name().starts_with(file));
597 if should_remove {
598 Some(entry.path().to_string())
599 } else {
600 None
601 }
602 })
603 .collect();
604
605 common_telemetry::warn!(
606 "Clean files {:?} under atomic write dir for {:?}",
607 actual_files,
608 names_to_remove
609 );
610
611 if let Err(e) = local_store.delete_iter(actual_files).await {
612 common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
613 }
614 }
615}
616
617pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
618 let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
619 clean_dir(&atomic_write_dir).await?;
620
621 let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
623 clean_dir(&old_atomic_temp_dir).await?;
624
625 let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
626 let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
627
628 Ok(with_instrument_layers(store, false))
629}
630
631async fn clean_dir(dir: &str) -> Result<()> {
633 if tokio::fs::try_exists(dir)
634 .await
635 .context(CleanDirSnafu { dir })?
636 {
637 tokio::fs::remove_dir_all(dir)
638 .await
639 .context(CleanDirSnafu { dir })?;
640 }
641
642 Ok(())
643}
644
645pub trait FilePathProvider: Send + Sync {
647 fn build_index_file_path(&self, file_id: RegionFileId) -> String;
649
650 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
652
653 fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
655}
656
657#[derive(Clone)]
659pub(crate) struct WriteCachePathProvider {
660 file_cache: FileCacheRef,
661}
662
663impl WriteCachePathProvider {
664 pub fn new(file_cache: FileCacheRef) -> Self {
666 Self { file_cache }
667 }
668}
669
670impl FilePathProvider for WriteCachePathProvider {
671 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
672 let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
673 self.file_cache.cache_file_path(puffin_key)
674 }
675
676 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
677 let puffin_key = IndexKey::new(
678 index_id.region_id(),
679 index_id.file_id(),
680 FileType::Puffin(index_id.version),
681 );
682 self.file_cache.cache_file_path(puffin_key)
683 }
684
685 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
686 let parquet_file_key =
687 IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
688 self.file_cache.cache_file_path(parquet_file_key)
689 }
690}
691
692#[derive(Clone, Debug)]
694pub(crate) struct RegionFilePathFactory {
695 pub(crate) table_dir: String,
696 pub(crate) path_type: PathType,
697}
698
699impl RegionFilePathFactory {
700 pub fn new(table_dir: String, path_type: PathType) -> Self {
702 Self {
703 table_dir,
704 path_type,
705 }
706 }
707}
708
709impl FilePathProvider for RegionFilePathFactory {
710 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
711 location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
712 }
713
714 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
715 location::index_file_path(&self.table_dir, index_id, self.path_type)
716 }
717
718 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
719 location::sst_file_path(&self.table_dir, file_id, self.path_type)
720 }
721}