1use std::sync::Arc;
16
17use object_store::services::Fs;
18use object_store::util::{join_dir, with_instrument_layers};
19use object_store::{ErrorKind, ObjectStore};
20use smallvec::SmallVec;
21use snafu::ResultExt;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
26use crate::cache::write_cache::SstUploadRequest;
27use crate::cache::CacheManagerRef;
28use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
29use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
30use crate::read::Source;
31use crate::region::options::IndexOptions;
32use crate::sst::file::{FileHandle, FileId, FileMeta};
33use crate::sst::index::intermediate::IntermediateManager;
34use crate::sst::index::puffin_manager::PuffinManagerFactory;
35use crate::sst::index::IndexerBuilderImpl;
36use crate::sst::location;
37use crate::sst::parquet::reader::ParquetReaderBuilder;
38use crate::sst::parquet::writer::ParquetWriter;
39use crate::sst::parquet::{SstInfo, WriteOptions};
40
41pub type AccessLayerRef = Arc<AccessLayer>;
42pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
44
45pub const ATOMIC_WRITE_DIR: &str = "tmp/";
46pub const OLD_ATOMIC_WRITE_DIR: &str = ".tmp/";
48
49pub struct AccessLayer {
51 region_dir: String,
52 object_store: ObjectStore,
54 puffin_manager_factory: PuffinManagerFactory,
56 intermediate_manager: IntermediateManager,
58}
59
60impl std::fmt::Debug for AccessLayer {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("AccessLayer")
63 .field("region_dir", &self.region_dir)
64 .finish()
65 }
66}
67
68impl AccessLayer {
69 pub fn new(
71 region_dir: impl Into<String>,
72 object_store: ObjectStore,
73 puffin_manager_factory: PuffinManagerFactory,
74 intermediate_manager: IntermediateManager,
75 ) -> AccessLayer {
76 AccessLayer {
77 region_dir: region_dir.into(),
78 object_store,
79 puffin_manager_factory,
80 intermediate_manager,
81 }
82 }
83
84 pub fn region_dir(&self) -> &str {
86 &self.region_dir
87 }
88
89 pub fn object_store(&self) -> &ObjectStore {
91 &self.object_store
92 }
93
94 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
96 &self.puffin_manager_factory
97 }
98
99 pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
101 let path = location::sst_file_path(&self.region_dir, file_meta.file_id);
102 self.object_store
103 .delete(&path)
104 .await
105 .context(DeleteSstSnafu {
106 file_id: file_meta.file_id,
107 })?;
108
109 let path = location::index_file_path(&self.region_dir, file_meta.file_id);
110 self.object_store
111 .delete(&path)
112 .await
113 .context(DeleteIndexSnafu {
114 file_id: file_meta.file_id,
115 })?;
116
117 Ok(())
118 }
119
120 pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
122 ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
123 }
124
125 pub async fn write_sst(
129 &self,
130 request: SstWriteRequest,
131 write_opts: &WriteOptions,
132 ) -> Result<SstInfoArray> {
133 let region_id = request.metadata.region_id;
134 let cache_manager = request.cache_manager.clone();
135
136 let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
137 write_cache
139 .write_and_upload_sst(
140 request,
141 SstUploadRequest {
142 dest_path_provider: RegionFilePathFactory {
143 region_dir: self.region_dir.clone(),
144 },
145 remote_store: self.object_store.clone(),
146 },
147 write_opts,
148 )
149 .await?
150 } else {
151 let store = self.object_store.clone();
153 let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
154 let indexer_builder = IndexerBuilderImpl {
155 op_type: request.op_type,
156 metadata: request.metadata.clone(),
157 row_group_size: write_opts.row_group_size,
158 puffin_manager: self
159 .puffin_manager_factory
160 .build(store, path_provider.clone()),
161 intermediate_manager: self.intermediate_manager.clone(),
162 index_options: request.index_options,
163 inverted_index_config: request.inverted_index_config,
164 fulltext_index_config: request.fulltext_index_config,
165 bloom_filter_index_config: request.bloom_filter_index_config,
166 };
167 let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
171 let mut writer = ParquetWriter::new_with_object_store(
172 self.object_store.clone(),
173 request.metadata,
174 indexer_builder,
175 path_provider,
176 )
177 .await
178 .with_file_cleaner(cleaner);
179 writer
180 .write_all(request.source, request.max_sequence, write_opts)
181 .await?
182 };
183
184 if !sst_info.is_empty() {
186 for sst in &sst_info {
187 if let Some(parquet_metadata) = &sst.file_metadata {
188 cache_manager.put_parquet_meta_data(
189 region_id,
190 sst.file_id,
191 parquet_metadata.clone(),
192 )
193 }
194 }
195 }
196
197 Ok(sst_info)
198 }
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
203pub enum OperationType {
204 Flush,
205 Compact,
206}
207
208pub struct SstWriteRequest {
210 pub op_type: OperationType,
211 pub metadata: RegionMetadataRef,
212 pub source: Source,
213 pub cache_manager: CacheManagerRef,
214 #[allow(dead_code)]
215 pub storage: Option<String>,
216 pub max_sequence: Option<SequenceNumber>,
217
218 pub index_options: IndexOptions,
220 pub inverted_index_config: InvertedIndexConfig,
221 pub fulltext_index_config: FulltextIndexConfig,
222 pub bloom_filter_index_config: BloomFilterConfig,
223}
224
225pub(crate) struct TempFileCleaner {
227 region_id: RegionId,
228 object_store: ObjectStore,
229}
230
231impl TempFileCleaner {
232 pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
234 Self {
235 region_id,
236 object_store,
237 }
238 }
239
240 pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
242 let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
243 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
244
245 Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
246 }
247
248 pub(crate) async fn clean_atomic_dir_files(
250 local_store: &ObjectStore,
251 names_to_remove: &[&str],
252 ) {
253 let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
256 if e.kind() != ErrorKind::NotFound {
257 common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
258 }
259 }) else {
260 return;
261 };
262
263 let actual_files: Vec<_> = entries
266 .into_iter()
267 .filter_map(|entry| {
268 if entry.metadata().is_dir() {
269 return None;
270 }
271
272 let should_remove = names_to_remove
274 .iter()
275 .any(|file| entry.name().starts_with(file));
276 if should_remove {
277 Some(entry.path().to_string())
278 } else {
279 None
280 }
281 })
282 .collect();
283
284 common_telemetry::warn!(
285 "Clean files {:?} under atomic write dir for {:?}",
286 actual_files,
287 names_to_remove
288 );
289
290 if let Err(e) = local_store.delete_iter(actual_files).await {
291 common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
292 }
293 }
294}
295
296pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
297 let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
298 clean_dir(&atomic_write_dir).await?;
299
300 let old_atomic_temp_dir = join_dir(root, OLD_ATOMIC_WRITE_DIR);
302 clean_dir(&old_atomic_temp_dir).await?;
303
304 let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
305 let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
306
307 Ok(with_instrument_layers(store, false))
308}
309
310async fn clean_dir(dir: &str) -> Result<()> {
312 if tokio::fs::try_exists(dir)
313 .await
314 .context(CleanDirSnafu { dir })?
315 {
316 tokio::fs::remove_dir_all(dir)
317 .await
318 .context(CleanDirSnafu { dir })?;
319 }
320
321 Ok(())
322}
323
324pub trait FilePathProvider: Send + Sync {
326 fn build_index_file_path(&self, file_id: FileId) -> String;
328
329 fn build_sst_file_path(&self, file_id: FileId) -> String;
331}
332
333#[derive(Clone)]
335pub(crate) struct WriteCachePathProvider {
336 region_id: RegionId,
337 file_cache: FileCacheRef,
338}
339
340impl WriteCachePathProvider {
341 pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
343 Self {
344 region_id,
345 file_cache,
346 }
347 }
348}
349
350impl FilePathProvider for WriteCachePathProvider {
351 fn build_index_file_path(&self, file_id: FileId) -> String {
352 let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
353 self.file_cache.cache_file_path(puffin_key)
354 }
355
356 fn build_sst_file_path(&self, file_id: FileId) -> String {
357 let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
358 self.file_cache.cache_file_path(parquet_file_key)
359 }
360}
361
362#[derive(Clone, Debug)]
364pub(crate) struct RegionFilePathFactory {
365 pub(crate) region_dir: String,
366}
367
368impl RegionFilePathFactory {
369 pub fn new(region_dir: String) -> Self {
371 Self { region_dir }
372 }
373}
374
375impl FilePathProvider for RegionFilePathFactory {
376 fn build_index_file_path(&self, file_id: FileId) -> String {
377 location::index_file_path(&self.region_dir, file_id)
378 }
379
380 fn build_sst_file_path(&self, file_id: FileId) -> String {
381 location::sst_file_path(&self.region_dir, file_id)
382 }
383}