1use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use async_stream::try_stream;
19use common_time::Timestamp;
20use either::Either;
21use futures::{Stream, TryStreamExt};
22use object_store::services::Fs;
23use object_store::util::{join_dir, with_instrument_layers};
24use object_store::{ATOMIC_WRITE_DIR, ErrorKind, OLD_ATOMIC_WRITE_DIR, ObjectStore};
25use smallvec::SmallVec;
26use snafu::ResultExt;
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_request::PathType;
29use store_api::sst_entry::StorageSstEntry;
30use store_api::storage::{FileId, RegionId, SequenceNumber};
31
32use crate::cache::CacheManagerRef;
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::write_cache::SstUploadRequest;
35use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
36use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
37use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
38use crate::read::{FlatSource, Source};
39use crate::region::options::IndexOptions;
40use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
41use crate::sst::index::IndexerBuilderImpl;
42use crate::sst::index::intermediate::IntermediateManager;
43use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
44use crate::sst::location::{self, region_dir_from_table_dir};
45use crate::sst::parquet::reader::ParquetReaderBuilder;
46use crate::sst::parquet::writer::ParquetWriter;
47use crate::sst::parquet::{SstInfo, WriteOptions};
48use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
49
50pub type AccessLayerRef = Arc<AccessLayer>;
51pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
53
54#[derive(Eq, PartialEq, Debug)]
56pub enum WriteType {
57 Flush,
59 Compaction,
61}
62
63#[derive(Debug)]
64pub struct Metrics {
65 pub(crate) write_type: WriteType,
66 pub(crate) iter_source: Duration,
67 pub(crate) write_batch: Duration,
68 pub(crate) update_index: Duration,
69 pub(crate) upload_parquet: Duration,
70 pub(crate) upload_puffin: Duration,
71 pub(crate) compact_memtable: Duration,
72}
73
74impl Metrics {
75 pub fn new(write_type: WriteType) -> Self {
76 Self {
77 write_type,
78 iter_source: Default::default(),
79 write_batch: Default::default(),
80 update_index: Default::default(),
81 upload_parquet: Default::default(),
82 upload_puffin: Default::default(),
83 compact_memtable: Default::default(),
84 }
85 }
86
87 pub(crate) fn merge(mut self, other: Self) -> Self {
88 assert_eq!(self.write_type, other.write_type);
89 self.iter_source += other.iter_source;
90 self.write_batch += other.write_batch;
91 self.update_index += other.update_index;
92 self.upload_parquet += other.upload_parquet;
93 self.upload_puffin += other.upload_puffin;
94 self.compact_memtable += other.compact_memtable;
95 self
96 }
97
98 pub(crate) fn observe(self) {
99 match self.write_type {
100 WriteType::Flush => {
101 FLUSH_ELAPSED
102 .with_label_values(&["iter_source"])
103 .observe(self.iter_source.as_secs_f64());
104 FLUSH_ELAPSED
105 .with_label_values(&["write_batch"])
106 .observe(self.write_batch.as_secs_f64());
107 FLUSH_ELAPSED
108 .with_label_values(&["update_index"])
109 .observe(self.update_index.as_secs_f64());
110 FLUSH_ELAPSED
111 .with_label_values(&["upload_parquet"])
112 .observe(self.upload_parquet.as_secs_f64());
113 FLUSH_ELAPSED
114 .with_label_values(&["upload_puffin"])
115 .observe(self.upload_puffin.as_secs_f64());
116 if !self.compact_memtable.is_zero() {
117 FLUSH_ELAPSED
118 .with_label_values(&["compact_memtable"])
119 .observe(self.upload_puffin.as_secs_f64());
120 }
121 }
122 WriteType::Compaction => {
123 COMPACTION_STAGE_ELAPSED
124 .with_label_values(&["iter_source"])
125 .observe(self.iter_source.as_secs_f64());
126 COMPACTION_STAGE_ELAPSED
127 .with_label_values(&["write_batch"])
128 .observe(self.write_batch.as_secs_f64());
129 COMPACTION_STAGE_ELAPSED
130 .with_label_values(&["update_index"])
131 .observe(self.update_index.as_secs_f64());
132 COMPACTION_STAGE_ELAPSED
133 .with_label_values(&["upload_parquet"])
134 .observe(self.upload_parquet.as_secs_f64());
135 COMPACTION_STAGE_ELAPSED
136 .with_label_values(&["upload_puffin"])
137 .observe(self.upload_puffin.as_secs_f64());
138 }
139 };
140 }
141}
142
143pub struct AccessLayer {
145 table_dir: String,
146 path_type: PathType,
148 object_store: ObjectStore,
150 puffin_manager_factory: PuffinManagerFactory,
152 intermediate_manager: IntermediateManager,
154}
155
156impl std::fmt::Debug for AccessLayer {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("AccessLayer")
159 .field("table_dir", &self.table_dir)
160 .finish()
161 }
162}
163
164impl AccessLayer {
165 pub fn new(
167 table_dir: impl Into<String>,
168 path_type: PathType,
169 object_store: ObjectStore,
170 puffin_manager_factory: PuffinManagerFactory,
171 intermediate_manager: IntermediateManager,
172 ) -> AccessLayer {
173 AccessLayer {
174 table_dir: table_dir.into(),
175 path_type,
176 object_store,
177 puffin_manager_factory,
178 intermediate_manager,
179 }
180 }
181
182 pub fn table_dir(&self) -> &str {
184 &self.table_dir
185 }
186
187 pub fn object_store(&self) -> &ObjectStore {
189 &self.object_store
190 }
191
192 pub fn path_type(&self) -> PathType {
194 self.path_type
195 }
196
197 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
199 &self.puffin_manager_factory
200 }
201
202 pub fn intermediate_manager(&self) -> &IntermediateManager {
204 &self.intermediate_manager
205 }
206
207 pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
209 let store = self.object_store.clone();
210 let path_provider =
211 RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type());
212 self.puffin_manager_factory.build(store, path_provider)
213 }
214
215 pub(crate) async fn delete_sst(
217 &self,
218 region_file_id: &RegionFileId,
219 index_file_id: &RegionIndexId,
220 ) -> Result<()> {
221 let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
222 self.object_store
223 .delete(&path)
224 .await
225 .context(DeleteSstSnafu {
226 file_id: region_file_id.file_id(),
227 })?;
228
229 for version in 0..=index_file_id.version {
231 let index_id = RegionIndexId::new(*region_file_id, version);
232 self.delete_index(index_id).await?;
233 }
234
235 Ok(())
236 }
237
238 pub(crate) async fn delete_index(
239 &self,
240 index_file_id: RegionIndexId,
241 ) -> Result<(), crate::error::Error> {
242 let path = location::index_file_path(
243 &self.table_dir,
244 RegionIndexId::new(index_file_id.file_id, index_file_id.version),
245 self.path_type,
246 );
247 self.object_store
248 .delete(&path)
249 .await
250 .context(DeleteIndexSnafu {
251 file_id: index_file_id.file_id(),
252 })?;
253 Ok(())
254 }
255
256 pub fn build_region_dir(&self, region_id: RegionId) -> String {
258 region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
259 }
260
261 pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
263 ParquetReaderBuilder::new(
264 self.table_dir.clone(),
265 self.path_type,
266 file,
267 self.object_store.clone(),
268 )
269 }
270
271 pub async fn write_sst(
275 &self,
276 request: SstWriteRequest,
277 write_opts: &WriteOptions,
278 metrics: &mut Metrics,
279 ) -> Result<SstInfoArray> {
280 let region_id = request.metadata.region_id;
281 let cache_manager = request.cache_manager.clone();
282
283 let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
284 write_cache
286 .write_and_upload_sst(
287 request,
288 SstUploadRequest {
289 dest_path_provider: RegionFilePathFactory::new(
290 self.table_dir.clone(),
291 self.path_type,
292 ),
293 remote_store: self.object_store.clone(),
294 },
295 write_opts,
296 metrics,
297 )
298 .await?
299 } else {
300 let store = self.object_store.clone();
302 let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
303 let indexer_builder = IndexerBuilderImpl {
304 build_type: request.op_type.into(),
305 metadata: request.metadata.clone(),
306 row_group_size: write_opts.row_group_size,
307 puffin_manager: self
308 .puffin_manager_factory
309 .build(store, path_provider.clone()),
310 write_cache_enabled: false,
311 intermediate_manager: self.intermediate_manager.clone(),
312 index_options: request.index_options,
313 inverted_index_config: request.inverted_index_config,
314 fulltext_index_config: request.fulltext_index_config,
315 bloom_filter_index_config: request.bloom_filter_index_config,
316 };
317 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
321 let mut writer = ParquetWriter::new_with_object_store(
322 self.object_store.clone(),
323 request.metadata,
324 request.index_config,
325 indexer_builder,
326 path_provider,
327 metrics,
328 )
329 .await
330 .with_file_cleaner(cleaner);
331 match request.source {
332 Either::Left(source) => {
333 writer
334 .write_all(source, request.max_sequence, write_opts)
335 .await?
336 }
337 Either::Right(flat_source) => {
338 writer.write_all_flat(flat_source, write_opts).await?
339 }
340 }
341 };
342
343 if !sst_info.is_empty() {
345 for sst in &sst_info {
346 if let Some(parquet_metadata) = &sst.file_metadata {
347 cache_manager.put_parquet_meta_data(
348 RegionFileId::new(region_id, sst.file_id),
349 parquet_metadata.clone(),
350 )
351 }
352 }
353 }
354
355 Ok(sst_info)
356 }
357
358 pub(crate) async fn put_sst(
360 &self,
361 data: &bytes::Bytes,
362 region_id: RegionId,
363 sst_info: &SstInfo,
364 cache_manager: &CacheManagerRef,
365 ) -> Result<Metrics> {
366 if let Some(write_cache) = cache_manager.write_cache() {
367 let upload_request = SstUploadRequest {
369 dest_path_provider: RegionFilePathFactory::new(
370 self.table_dir.clone(),
371 self.path_type,
372 ),
373 remote_store: self.object_store.clone(),
374 };
375 write_cache
376 .put_and_upload_sst(data, region_id, sst_info, upload_request)
377 .await
378 } else {
379 let start = Instant::now();
380 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
381 let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
382 let sst_file_path =
383 path_provider.build_sst_file_path(RegionFileId::new(region_id, sst_info.file_id));
384 let mut writer = self
385 .object_store
386 .writer_with(&sst_file_path)
387 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
388 .concurrent(DEFAULT_WRITE_CONCURRENCY)
389 .await
390 .context(OpenDalSnafu)?;
391 if let Err(err) = writer.write(data.clone()).await.context(OpenDalSnafu) {
392 cleaner.clean_by_file_id(sst_info.file_id).await;
393 return Err(err);
394 }
395 if let Err(err) = writer.close().await.context(OpenDalSnafu) {
396 cleaner.clean_by_file_id(sst_info.file_id).await;
397 return Err(err);
398 }
399 let mut metrics = Metrics::new(WriteType::Flush);
400 metrics.write_batch = start.elapsed();
401 Ok(metrics)
402 }
403 }
404
405 pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> + use<> {
407 let object_store = self.object_store.clone();
408 let table_dir = self.table_dir.clone();
409
410 try_stream! {
411 let mut lister = object_store
412 .lister_with(table_dir.as_str())
413 .recursive(true)
414 .await
415 .context(OpenDalSnafu)?;
416
417 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
418 let metadata = entry.metadata();
419 if metadata.is_dir() {
420 continue;
421 }
422
423 let path = entry.path();
424 if !path.ends_with(".parquet") && !path.ends_with(".puffin") {
425 continue;
426 }
427
428 let file_size = metadata.content_length();
429 let file_size = if file_size == 0 { None } else { Some(file_size) };
430 let last_modified_ms = metadata
431 .last_modified()
432 .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
433
434 let entry = StorageSstEntry {
435 file_path: path.to_string(),
436 file_size,
437 last_modified_ms,
438 node_id: None,
439 };
440
441 yield entry;
442 }
443 }
444 }
445}
446
447#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
449pub enum OperationType {
450 Flush,
451 Compact,
452}
453
454pub struct SstWriteRequest {
456 pub op_type: OperationType,
457 pub metadata: RegionMetadataRef,
458 pub source: Either<Source, FlatSource>,
459 pub cache_manager: CacheManagerRef,
460 #[allow(dead_code)]
461 pub storage: Option<String>,
462 pub max_sequence: Option<SequenceNumber>,
463
464 pub index_options: IndexOptions,
466 pub index_config: IndexConfig,
467 pub inverted_index_config: InvertedIndexConfig,
468 pub fulltext_index_config: FulltextIndexConfig,
469 pub bloom_filter_index_config: BloomFilterConfig,
470}
471
472pub(crate) struct TempFileCleaner {
474 region_id: RegionId,
475 object_store: ObjectStore,
476}
477
478impl TempFileCleaner {
479 pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
481 Self {
482 region_id,
483 object_store,
484 }
485 }
486
487 pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
490 let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
491 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin(0)).to_string();
492
493 Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
494 }
495
496 pub(crate) async fn clean_atomic_dir_files(
498 local_store: &ObjectStore,
499 names_to_remove: &[&str],
500 ) {
501 let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
504 if e.kind() != ErrorKind::NotFound {
505 common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
506 }
507 }) else {
508 return;
509 };
510
511 let actual_files: Vec<_> = entries
514 .into_iter()
515 .filter_map(|entry| {
516 if entry.metadata().is_dir() {
517 return None;
518 }
519
520 let should_remove = names_to_remove
522 .iter()
523 .any(|file| entry.name().starts_with(file));
524 if should_remove {
525 Some(entry.path().to_string())
526 } else {
527 None
528 }
529 })
530 .collect();
531
532 common_telemetry::warn!(
533 "Clean files {:?} under atomic write dir for {:?}",
534 actual_files,
535 names_to_remove
536 );
537
538 if let Err(e) = local_store.delete_iter(actual_files).await {
539 common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
540 }
541 }
542}
543
544pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
545 let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
546 clean_dir(&atomic_write_dir).await?;
547
548 let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
550 clean_dir(&old_atomic_temp_dir).await?;
551
552 let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
553 let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
554
555 Ok(with_instrument_layers(store, false))
556}
557
558async fn clean_dir(dir: &str) -> Result<()> {
560 if tokio::fs::try_exists(dir)
561 .await
562 .context(CleanDirSnafu { dir })?
563 {
564 tokio::fs::remove_dir_all(dir)
565 .await
566 .context(CleanDirSnafu { dir })?;
567 }
568
569 Ok(())
570}
571
572pub trait FilePathProvider: Send + Sync {
574 fn build_index_file_path(&self, file_id: RegionFileId) -> String;
576
577 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String;
579
580 fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
582}
583
584#[derive(Clone)]
586pub(crate) struct WriteCachePathProvider {
587 file_cache: FileCacheRef,
588}
589
590impl WriteCachePathProvider {
591 pub fn new(file_cache: FileCacheRef) -> Self {
593 Self { file_cache }
594 }
595}
596
597impl FilePathProvider for WriteCachePathProvider {
598 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
599 let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin(0));
600 self.file_cache.cache_file_path(puffin_key)
601 }
602
603 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
604 let puffin_key = IndexKey::new(
605 index_id.region_id(),
606 index_id.file_id(),
607 FileType::Puffin(index_id.version),
608 );
609 self.file_cache.cache_file_path(puffin_key)
610 }
611
612 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
613 let parquet_file_key =
614 IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
615 self.file_cache.cache_file_path(parquet_file_key)
616 }
617}
618
619#[derive(Clone, Debug)]
621pub(crate) struct RegionFilePathFactory {
622 pub(crate) table_dir: String,
623 pub(crate) path_type: PathType,
624}
625
626impl RegionFilePathFactory {
627 pub fn new(table_dir: String, path_type: PathType) -> Self {
629 Self {
630 table_dir,
631 path_type,
632 }
633 }
634}
635
636impl FilePathProvider for RegionFilePathFactory {
637 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
638 location::index_file_path_legacy(&self.table_dir, file_id, self.path_type)
639 }
640
641 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
642 location::index_file_path(&self.table_dir, index_id, self.path_type)
643 }
644
645 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
646 location::sst_file_path(&self.table_dir, file_id, self.path_type)
647 }
648}