1use std::sync::Arc;
16use std::time::Duration;
17
18use object_store::services::Fs;
19use object_store::util::{join_dir, with_instrument_layers};
20use object_store::{ErrorKind, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
21use smallvec::SmallVec;
22use snafu::ResultExt;
23use store_api::metadata::RegionMetadataRef;
24use store_api::region_request::PathType;
25use store_api::storage::{RegionId, SequenceNumber};
26
27use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
28use crate::cache::write_cache::SstUploadRequest;
29use crate::cache::CacheManagerRef;
30use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
31use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
32use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
33use crate::read::Source;
34use crate::region::options::IndexOptions;
35use crate::sst::file::{FileHandle, FileId, FileMeta, RegionFileId};
36use crate::sst::index::intermediate::IntermediateManager;
37use crate::sst::index::puffin_manager::PuffinManagerFactory;
38use crate::sst::index::IndexerBuilderImpl;
39use crate::sst::location::{self, region_dir_from_table_dir};
40use crate::sst::parquet::reader::ParquetReaderBuilder;
41use crate::sst::parquet::writer::ParquetWriter;
42use crate::sst::parquet::{SstInfo, WriteOptions};
43
44pub type AccessLayerRef = Arc<AccessLayer>;
45pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
47
48#[derive(Eq, PartialEq, Debug)]
50pub enum WriteType {
51 Flush,
53 Compaction,
55}
56
57#[derive(Debug)]
58pub struct Metrics {
59 pub(crate) write_type: WriteType,
60 pub(crate) iter_source: Duration,
61 pub(crate) write_batch: Duration,
62 pub(crate) update_index: Duration,
63 pub(crate) upload_parquet: Duration,
64 pub(crate) upload_puffin: Duration,
65}
66
67impl Metrics {
68 pub(crate) fn new(write_type: WriteType) -> Self {
69 Self {
70 write_type,
71 iter_source: Default::default(),
72 write_batch: Default::default(),
73 update_index: Default::default(),
74 upload_parquet: Default::default(),
75 upload_puffin: Default::default(),
76 }
77 }
78
79 pub(crate) fn merge(mut self, other: Self) -> Self {
80 assert_eq!(self.write_type, other.write_type);
81 self.iter_source += other.iter_source;
82 self.write_batch += other.write_batch;
83 self.update_index += other.update_index;
84 self.upload_parquet += other.upload_parquet;
85 self.upload_puffin += other.upload_puffin;
86 self
87 }
88
89 pub(crate) fn observe(self) {
90 match self.write_type {
91 WriteType::Flush => {
92 FLUSH_ELAPSED
93 .with_label_values(&["iter_source"])
94 .observe(self.iter_source.as_secs_f64());
95 FLUSH_ELAPSED
96 .with_label_values(&["write_batch"])
97 .observe(self.write_batch.as_secs_f64());
98 FLUSH_ELAPSED
99 .with_label_values(&["update_index"])
100 .observe(self.update_index.as_secs_f64());
101 FLUSH_ELAPSED
102 .with_label_values(&["upload_parquet"])
103 .observe(self.upload_parquet.as_secs_f64());
104 FLUSH_ELAPSED
105 .with_label_values(&["upload_puffin"])
106 .observe(self.upload_puffin.as_secs_f64());
107 }
108 WriteType::Compaction => {
109 COMPACTION_STAGE_ELAPSED
110 .with_label_values(&["iter_source"])
111 .observe(self.iter_source.as_secs_f64());
112 COMPACTION_STAGE_ELAPSED
113 .with_label_values(&["write_batch"])
114 .observe(self.write_batch.as_secs_f64());
115 COMPACTION_STAGE_ELAPSED
116 .with_label_values(&["update_index"])
117 .observe(self.update_index.as_secs_f64());
118 COMPACTION_STAGE_ELAPSED
119 .with_label_values(&["upload_parquet"])
120 .observe(self.upload_parquet.as_secs_f64());
121 COMPACTION_STAGE_ELAPSED
122 .with_label_values(&["upload_puffin"])
123 .observe(self.upload_puffin.as_secs_f64());
124 }
125 };
126 }
127}
128
129pub struct AccessLayer {
131 table_dir: String,
132 path_type: PathType,
134 object_store: ObjectStore,
136 puffin_manager_factory: PuffinManagerFactory,
138 intermediate_manager: IntermediateManager,
140}
141
142impl std::fmt::Debug for AccessLayer {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("AccessLayer")
145 .field("table_dir", &self.table_dir)
146 .finish()
147 }
148}
149
150impl AccessLayer {
151 pub fn new(
153 table_dir: impl Into<String>,
154 path_type: PathType,
155 object_store: ObjectStore,
156 puffin_manager_factory: PuffinManagerFactory,
157 intermediate_manager: IntermediateManager,
158 ) -> AccessLayer {
159 AccessLayer {
160 table_dir: table_dir.into(),
161 path_type,
162 object_store,
163 puffin_manager_factory,
164 intermediate_manager,
165 }
166 }
167
168 pub fn table_dir(&self) -> &str {
170 &self.table_dir
171 }
172
173 pub fn object_store(&self) -> &ObjectStore {
175 &self.object_store
176 }
177
178 pub fn path_type(&self) -> PathType {
180 self.path_type
181 }
182
183 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
185 &self.puffin_manager_factory
186 }
187
188 pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
190 let path = location::sst_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
191 self.object_store
192 .delete(&path)
193 .await
194 .context(DeleteSstSnafu {
195 file_id: file_meta.file_id,
196 })?;
197
198 let path = location::index_file_path(&self.table_dir, file_meta.file_id(), self.path_type);
199 self.object_store
200 .delete(&path)
201 .await
202 .context(DeleteIndexSnafu {
203 file_id: file_meta.file_id,
204 })?;
205
206 Ok(())
207 }
208
209 pub fn build_region_dir(&self, region_id: RegionId) -> String {
211 region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
212 }
213
214 pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
216 ParquetReaderBuilder::new(
217 self.table_dir.clone(),
218 self.path_type,
219 file,
220 self.object_store.clone(),
221 )
222 }
223
224 pub async fn write_sst(
228 &self,
229 request: SstWriteRequest,
230 write_opts: &WriteOptions,
231 write_type: WriteType,
232 ) -> Result<(SstInfoArray, Metrics)> {
233 let region_id = request.metadata.region_id;
234 let cache_manager = request.cache_manager.clone();
235
236 let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() {
237 write_cache
239 .write_and_upload_sst(
240 request,
241 SstUploadRequest {
242 dest_path_provider: RegionFilePathFactory {
243 table_dir: self.table_dir.clone(),
244 path_type: self.path_type,
245 },
246 remote_store: self.object_store.clone(),
247 },
248 write_opts,
249 write_type,
250 )
251 .await?
252 } else {
253 let store = self.object_store.clone();
255 let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
256 let indexer_builder = IndexerBuilderImpl {
257 op_type: request.op_type,
258 metadata: request.metadata.clone(),
259 row_group_size: write_opts.row_group_size,
260 puffin_manager: self
261 .puffin_manager_factory
262 .build(store, path_provider.clone()),
263 intermediate_manager: self.intermediate_manager.clone(),
264 index_options: request.index_options,
265 inverted_index_config: request.inverted_index_config,
266 fulltext_index_config: request.fulltext_index_config,
267 bloom_filter_index_config: request.bloom_filter_index_config,
268 };
269 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
273 let mut writer = ParquetWriter::new_with_object_store(
274 self.object_store.clone(),
275 request.metadata,
276 indexer_builder,
277 path_provider,
278 Metrics::new(write_type),
279 )
280 .await
281 .with_file_cleaner(cleaner);
282 let ssts = writer
283 .write_all(request.source, request.max_sequence, write_opts)
284 .await?;
285 let metrics = writer.into_metrics();
286 (ssts, metrics)
287 };
288
289 if !sst_info.is_empty() {
291 for sst in &sst_info {
292 if let Some(parquet_metadata) = &sst.file_metadata {
293 cache_manager.put_parquet_meta_data(
294 RegionFileId::new(region_id, sst.file_id),
295 parquet_metadata.clone(),
296 )
297 }
298 }
299 }
300
301 Ok((sst_info, metrics))
302 }
303}
304
305#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
307pub enum OperationType {
308 Flush,
309 Compact,
310}
311
312pub struct SstWriteRequest {
314 pub op_type: OperationType,
315 pub metadata: RegionMetadataRef,
316 pub source: Source,
317 pub cache_manager: CacheManagerRef,
318 #[allow(dead_code)]
319 pub storage: Option<String>,
320 pub max_sequence: Option<SequenceNumber>,
321
322 pub index_options: IndexOptions,
324 pub inverted_index_config: InvertedIndexConfig,
325 pub fulltext_index_config: FulltextIndexConfig,
326 pub bloom_filter_index_config: BloomFilterConfig,
327}
328
329pub(crate) struct TempFileCleaner {
331 region_id: RegionId,
332 object_store: ObjectStore,
333}
334
335impl TempFileCleaner {
336 pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
338 Self {
339 region_id,
340 object_store,
341 }
342 }
343
344 pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
346 let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
347 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
348
349 Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
350 }
351
352 pub(crate) async fn clean_atomic_dir_files(
354 local_store: &ObjectStore,
355 names_to_remove: &[&str],
356 ) {
357 let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
360 if e.kind() != ErrorKind::NotFound {
361 common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
362 }
363 }) else {
364 return;
365 };
366
367 let actual_files: Vec<_> = entries
370 .into_iter()
371 .filter_map(|entry| {
372 if entry.metadata().is_dir() {
373 return None;
374 }
375
376 let should_remove = names_to_remove
378 .iter()
379 .any(|file| entry.name().starts_with(file));
380 if should_remove {
381 Some(entry.path().to_string())
382 } else {
383 None
384 }
385 })
386 .collect();
387
388 common_telemetry::warn!(
389 "Clean files {:?} under atomic write dir for {:?}",
390 actual_files,
391 names_to_remove
392 );
393
394 if let Err(e) = local_store.delete_iter(actual_files).await {
395 common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
396 }
397 }
398}
399
400pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
401 let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
402 clean_dir(&atomic_write_dir).await?;
403
404 let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
406 clean_dir(&old_atomic_temp_dir).await?;
407
408 let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
409 let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
410
411 Ok(with_instrument_layers(store, false))
412}
413
414async fn clean_dir(dir: &str) -> Result<()> {
416 if tokio::fs::try_exists(dir)
417 .await
418 .context(CleanDirSnafu { dir })?
419 {
420 tokio::fs::remove_dir_all(dir)
421 .await
422 .context(CleanDirSnafu { dir })?;
423 }
424
425 Ok(())
426}
427
428pub trait FilePathProvider: Send + Sync {
430 fn build_index_file_path(&self, file_id: RegionFileId) -> String;
432
433 fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
435}
436
437#[derive(Clone)]
439pub(crate) struct WriteCachePathProvider {
440 file_cache: FileCacheRef,
441}
442
443impl WriteCachePathProvider {
444 pub fn new(file_cache: FileCacheRef) -> Self {
446 Self { file_cache }
447 }
448}
449
450impl FilePathProvider for WriteCachePathProvider {
451 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
452 let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
453 self.file_cache.cache_file_path(puffin_key)
454 }
455
456 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
457 let parquet_file_key =
458 IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
459 self.file_cache.cache_file_path(parquet_file_key)
460 }
461}
462
463#[derive(Clone, Debug)]
465pub(crate) struct RegionFilePathFactory {
466 pub(crate) table_dir: String,
467 pub(crate) path_type: PathType,
468}
469
470impl RegionFilePathFactory {
471 pub fn new(table_dir: String, path_type: PathType) -> Self {
473 Self {
474 table_dir,
475 path_type,
476 }
477 }
478}
479
480impl FilePathProvider for RegionFilePathFactory {
481 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
482 location::index_file_path(&self.table_dir, file_id, self.path_type)
483 }
484
485 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
486 location::sst_file_path(&self.table_dir, file_id, self.path_type)
487 }
488}