1use std::sync::Arc;
16use std::time::Duration;
17
18use async_stream::try_stream;
19use common_time::Timestamp;
20use futures::{Stream, TryStreamExt};
21use object_store::services::Fs;
22use object_store::util::{join_dir, with_instrument_layers};
23use object_store::{ErrorKind, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
24use smallvec::SmallVec;
25use snafu::ResultExt;
26use store_api::metadata::RegionMetadataRef;
27use store_api::region_request::PathType;
28use store_api::sst_entry::StorageSstEntry;
29use store_api::storage::{RegionId, SequenceNumber};
30
31use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
32use crate::cache::write_cache::SstUploadRequest;
33use crate::cache::CacheManagerRef;
34use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
35use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
36use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
37use crate::read::Source;
38use crate::region::options::IndexOptions;
39use crate::sst::file::{FileHandle, FileId, RegionFileId};
40use crate::sst::index::intermediate::IntermediateManager;
41use crate::sst::index::puffin_manager::PuffinManagerFactory;
42use crate::sst::index::IndexerBuilderImpl;
43use crate::sst::location::{self, region_dir_from_table_dir};
44use crate::sst::parquet::reader::ParquetReaderBuilder;
45use crate::sst::parquet::writer::ParquetWriter;
46use crate::sst::parquet::{SstInfo, WriteOptions};
47
48pub type AccessLayerRef = Arc<AccessLayer>;
49pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
51
52#[derive(Eq, PartialEq, Debug)]
54pub enum WriteType {
55 Flush,
57 Compaction,
59}
60
61#[derive(Debug)]
62pub struct Metrics {
63 pub(crate) write_type: WriteType,
64 pub(crate) iter_source: Duration,
65 pub(crate) write_batch: Duration,
66 pub(crate) update_index: Duration,
67 pub(crate) upload_parquet: Duration,
68 pub(crate) upload_puffin: Duration,
69}
70
71impl Metrics {
72 pub(crate) fn new(write_type: WriteType) -> Self {
73 Self {
74 write_type,
75 iter_source: Default::default(),
76 write_batch: Default::default(),
77 update_index: Default::default(),
78 upload_parquet: Default::default(),
79 upload_puffin: Default::default(),
80 }
81 }
82
83 pub(crate) fn merge(mut self, other: Self) -> Self {
84 assert_eq!(self.write_type, other.write_type);
85 self.iter_source += other.iter_source;
86 self.write_batch += other.write_batch;
87 self.update_index += other.update_index;
88 self.upload_parquet += other.upload_parquet;
89 self.upload_puffin += other.upload_puffin;
90 self
91 }
92
93 pub(crate) fn observe(self) {
94 match self.write_type {
95 WriteType::Flush => {
96 FLUSH_ELAPSED
97 .with_label_values(&["iter_source"])
98 .observe(self.iter_source.as_secs_f64());
99 FLUSH_ELAPSED
100 .with_label_values(&["write_batch"])
101 .observe(self.write_batch.as_secs_f64());
102 FLUSH_ELAPSED
103 .with_label_values(&["update_index"])
104 .observe(self.update_index.as_secs_f64());
105 FLUSH_ELAPSED
106 .with_label_values(&["upload_parquet"])
107 .observe(self.upload_parquet.as_secs_f64());
108 FLUSH_ELAPSED
109 .with_label_values(&["upload_puffin"])
110 .observe(self.upload_puffin.as_secs_f64());
111 }
112 WriteType::Compaction => {
113 COMPACTION_STAGE_ELAPSED
114 .with_label_values(&["iter_source"])
115 .observe(self.iter_source.as_secs_f64());
116 COMPACTION_STAGE_ELAPSED
117 .with_label_values(&["write_batch"])
118 .observe(self.write_batch.as_secs_f64());
119 COMPACTION_STAGE_ELAPSED
120 .with_label_values(&["update_index"])
121 .observe(self.update_index.as_secs_f64());
122 COMPACTION_STAGE_ELAPSED
123 .with_label_values(&["upload_parquet"])
124 .observe(self.upload_parquet.as_secs_f64());
125 COMPACTION_STAGE_ELAPSED
126 .with_label_values(&["upload_puffin"])
127 .observe(self.upload_puffin.as_secs_f64());
128 }
129 };
130 }
131}
132
133pub struct AccessLayer {
135 table_dir: String,
136 path_type: PathType,
138 object_store: ObjectStore,
140 puffin_manager_factory: PuffinManagerFactory,
142 intermediate_manager: IntermediateManager,
144}
145
146impl std::fmt::Debug for AccessLayer {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 f.debug_struct("AccessLayer")
149 .field("table_dir", &self.table_dir)
150 .finish()
151 }
152}
153
154impl AccessLayer {
155 pub fn new(
157 table_dir: impl Into<String>,
158 path_type: PathType,
159 object_store: ObjectStore,
160 puffin_manager_factory: PuffinManagerFactory,
161 intermediate_manager: IntermediateManager,
162 ) -> AccessLayer {
163 AccessLayer {
164 table_dir: table_dir.into(),
165 path_type,
166 object_store,
167 puffin_manager_factory,
168 intermediate_manager,
169 }
170 }
171
172 pub fn table_dir(&self) -> &str {
174 &self.table_dir
175 }
176
177 pub fn object_store(&self) -> &ObjectStore {
179 &self.object_store
180 }
181
182 pub fn path_type(&self) -> PathType {
184 self.path_type
185 }
186
187 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
189 &self.puffin_manager_factory
190 }
191
192 pub fn intermediate_manager(&self) -> &IntermediateManager {
194 &self.intermediate_manager
195 }
196
197 pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
199 let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
200 self.object_store
201 .delete(&path)
202 .await
203 .context(DeleteSstSnafu {
204 file_id: region_file_id.file_id(),
205 })?;
206
207 let path = location::index_file_path(&self.table_dir, *region_file_id, self.path_type);
208 self.object_store
209 .delete(&path)
210 .await
211 .context(DeleteIndexSnafu {
212 file_id: region_file_id.file_id(),
213 })?;
214
215 Ok(())
216 }
217
218 pub fn build_region_dir(&self, region_id: RegionId) -> String {
220 region_dir_from_table_dir(&self.table_dir, region_id, self.path_type)
221 }
222
223 pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
225 ParquetReaderBuilder::new(
226 self.table_dir.clone(),
227 self.path_type,
228 file,
229 self.object_store.clone(),
230 )
231 }
232
233 pub async fn write_sst(
237 &self,
238 request: SstWriteRequest,
239 write_opts: &WriteOptions,
240 write_type: WriteType,
241 ) -> Result<(SstInfoArray, Metrics)> {
242 let region_id = request.metadata.region_id;
243 let cache_manager = request.cache_manager.clone();
244
245 let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() {
246 write_cache
248 .write_and_upload_sst(
249 request,
250 SstUploadRequest {
251 dest_path_provider: RegionFilePathFactory::new(
252 self.table_dir.clone(),
253 self.path_type,
254 ),
255 remote_store: self.object_store.clone(),
256 },
257 write_opts,
258 write_type,
259 )
260 .await?
261 } else {
262 let store = self.object_store.clone();
264 let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
265 let indexer_builder = IndexerBuilderImpl {
266 op_type: request.op_type,
267 metadata: request.metadata.clone(),
268 row_group_size: write_opts.row_group_size,
269 puffin_manager: self
270 .puffin_manager_factory
271 .build(store, path_provider.clone()),
272 intermediate_manager: self.intermediate_manager.clone(),
273 index_options: request.index_options,
274 inverted_index_config: request.inverted_index_config,
275 fulltext_index_config: request.fulltext_index_config,
276 bloom_filter_index_config: request.bloom_filter_index_config,
277 };
278 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
282 let mut writer = ParquetWriter::new_with_object_store(
283 self.object_store.clone(),
284 request.metadata,
285 indexer_builder,
286 path_provider,
287 Metrics::new(write_type),
288 )
289 .await
290 .with_file_cleaner(cleaner);
291 let ssts = writer
292 .write_all(request.source, request.max_sequence, write_opts)
293 .await?;
294 let metrics = writer.into_metrics();
295 (ssts, metrics)
296 };
297
298 if !sst_info.is_empty() {
300 for sst in &sst_info {
301 if let Some(parquet_metadata) = &sst.file_metadata {
302 cache_manager.put_parquet_meta_data(
303 RegionFileId::new(region_id, sst.file_id),
304 parquet_metadata.clone(),
305 )
306 }
307 }
308 }
309
310 Ok((sst_info, metrics))
311 }
312
313 pub fn storage_sst_entries(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
315 let object_store = self.object_store.clone();
316 let table_dir = self.table_dir.clone();
317
318 try_stream! {
319 let mut lister = object_store
320 .lister_with(table_dir.as_str())
321 .recursive(true)
322 .await
323 .context(OpenDalSnafu)?;
324
325 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
326 let metadata = entry.metadata();
327 if metadata.is_dir() {
328 continue;
329 }
330
331 let path = entry.path();
332 if !path.ends_with(".parquet") && !path.ends_with(".puffin") {
333 continue;
334 }
335
336 let file_size = metadata.content_length();
337 let file_size = if file_size == 0 { None } else { Some(file_size) };
338 let last_modified_ms = metadata
339 .last_modified()
340 .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
341
342 let entry = StorageSstEntry {
343 file_path: path.to_string(),
344 file_size,
345 last_modified_ms,
346 };
347
348 yield entry;
349 }
350 }
351 }
352}
353
354#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
356pub enum OperationType {
357 Flush,
358 Compact,
359}
360
361pub struct SstWriteRequest {
363 pub op_type: OperationType,
364 pub metadata: RegionMetadataRef,
365 pub source: Source,
366 pub cache_manager: CacheManagerRef,
367 #[allow(dead_code)]
368 pub storage: Option<String>,
369 pub max_sequence: Option<SequenceNumber>,
370
371 pub index_options: IndexOptions,
373 pub inverted_index_config: InvertedIndexConfig,
374 pub fulltext_index_config: FulltextIndexConfig,
375 pub bloom_filter_index_config: BloomFilterConfig,
376}
377
378pub(crate) struct TempFileCleaner {
380 region_id: RegionId,
381 object_store: ObjectStore,
382}
383
384impl TempFileCleaner {
385 pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
387 Self {
388 region_id,
389 object_store,
390 }
391 }
392
393 pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
395 let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
396 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
397
398 Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
399 }
400
401 pub(crate) async fn clean_atomic_dir_files(
403 local_store: &ObjectStore,
404 names_to_remove: &[&str],
405 ) {
406 let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
409 if e.kind() != ErrorKind::NotFound {
410 common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
411 }
412 }) else {
413 return;
414 };
415
416 let actual_files: Vec<_> = entries
419 .into_iter()
420 .filter_map(|entry| {
421 if entry.metadata().is_dir() {
422 return None;
423 }
424
425 let should_remove = names_to_remove
427 .iter()
428 .any(|file| entry.name().starts_with(file));
429 if should_remove {
430 Some(entry.path().to_string())
431 } else {
432 None
433 }
434 })
435 .collect();
436
437 common_telemetry::warn!(
438 "Clean files {:?} under atomic write dir for {:?}",
439 actual_files,
440 names_to_remove
441 );
442
443 if let Err(e) = local_store.delete_iter(actual_files).await {
444 common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
445 }
446 }
447}
448
449pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
450 let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
451 clean_dir(&atomic_write_dir).await?;
452
453 let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
455 clean_dir(&old_atomic_temp_dir).await?;
456
457 let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
458 let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
459
460 Ok(with_instrument_layers(store, false))
461}
462
463async fn clean_dir(dir: &str) -> Result<()> {
465 if tokio::fs::try_exists(dir)
466 .await
467 .context(CleanDirSnafu { dir })?
468 {
469 tokio::fs::remove_dir_all(dir)
470 .await
471 .context(CleanDirSnafu { dir })?;
472 }
473
474 Ok(())
475}
476
477pub trait FilePathProvider: Send + Sync {
479 fn build_index_file_path(&self, file_id: RegionFileId) -> String;
481
482 fn build_sst_file_path(&self, file_id: RegionFileId) -> String;
484}
485
486#[derive(Clone)]
488pub(crate) struct WriteCachePathProvider {
489 file_cache: FileCacheRef,
490}
491
492impl WriteCachePathProvider {
493 pub fn new(file_cache: FileCacheRef) -> Self {
495 Self { file_cache }
496 }
497}
498
499impl FilePathProvider for WriteCachePathProvider {
500 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
501 let puffin_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
502 self.file_cache.cache_file_path(puffin_key)
503 }
504
505 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
506 let parquet_file_key =
507 IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
508 self.file_cache.cache_file_path(parquet_file_key)
509 }
510}
511
512#[derive(Clone, Debug)]
514pub(crate) struct RegionFilePathFactory {
515 pub(crate) table_dir: String,
516 pub(crate) path_type: PathType,
517}
518
519impl RegionFilePathFactory {
520 pub fn new(table_dir: String, path_type: PathType) -> Self {
522 Self {
523 table_dir,
524 path_type,
525 }
526 }
527}
528
529impl FilePathProvider for RegionFilePathFactory {
530 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
531 location::index_file_path(&self.table_dir, file_id, self.path_type)
532 }
533
534 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
535 location::sst_file_path(&self.table_dir, file_id, self.path_type)
536 }
537}