1use std::sync::Arc;
16
17use object_store::services::Fs;
18use object_store::util::{join_dir, with_instrument_layers};
19use object_store::ObjectStore;
20use smallvec::SmallVec;
21use snafu::ResultExt;
22use store_api::metadata::RegionMetadataRef;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
26use crate::cache::write_cache::SstUploadRequest;
27use crate::cache::CacheManagerRef;
28use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
29use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
30use crate::read::Source;
31use crate::region::options::IndexOptions;
32use crate::sst::file::{FileHandle, FileId, FileMeta};
33use crate::sst::index::intermediate::IntermediateManager;
34use crate::sst::index::puffin_manager::PuffinManagerFactory;
35use crate::sst::index::IndexerBuilderImpl;
36use crate::sst::location;
37use crate::sst::parquet::reader::ParquetReaderBuilder;
38use crate::sst::parquet::writer::ParquetWriter;
39use crate::sst::parquet::{SstInfo, WriteOptions};
40
41pub type AccessLayerRef = Arc<AccessLayer>;
42pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
44
45pub struct AccessLayer {
47 region_dir: String,
48 object_store: ObjectStore,
50 puffin_manager_factory: PuffinManagerFactory,
52 intermediate_manager: IntermediateManager,
54}
55
56impl std::fmt::Debug for AccessLayer {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("AccessLayer")
59 .field("region_dir", &self.region_dir)
60 .finish()
61 }
62}
63
64impl AccessLayer {
65 pub fn new(
67 region_dir: impl Into<String>,
68 object_store: ObjectStore,
69 puffin_manager_factory: PuffinManagerFactory,
70 intermediate_manager: IntermediateManager,
71 ) -> AccessLayer {
72 AccessLayer {
73 region_dir: region_dir.into(),
74 object_store,
75 puffin_manager_factory,
76 intermediate_manager,
77 }
78 }
79
80 pub fn region_dir(&self) -> &str {
82 &self.region_dir
83 }
84
85 pub fn object_store(&self) -> &ObjectStore {
87 &self.object_store
88 }
89
90 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
92 &self.puffin_manager_factory
93 }
94
95 pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
97 let path = location::sst_file_path(&self.region_dir, file_meta.file_id);
98 self.object_store
99 .delete(&path)
100 .await
101 .context(DeleteSstSnafu {
102 file_id: file_meta.file_id,
103 })?;
104
105 let path = location::index_file_path(&self.region_dir, file_meta.file_id);
106 self.object_store
107 .delete(&path)
108 .await
109 .context(DeleteIndexSnafu {
110 file_id: file_meta.file_id,
111 })?;
112
113 Ok(())
114 }
115
116 pub(crate) fn read_sst(&self, file: FileHandle) -> ParquetReaderBuilder {
118 ParquetReaderBuilder::new(self.region_dir.clone(), file, self.object_store.clone())
119 }
120
121 pub async fn write_sst(
125 &self,
126 request: SstWriteRequest,
127 write_opts: &WriteOptions,
128 ) -> Result<SstInfoArray> {
129 let region_id = request.metadata.region_id;
130 let cache_manager = request.cache_manager.clone();
131
132 let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
133 write_cache
135 .write_and_upload_sst(
136 request,
137 SstUploadRequest {
138 dest_path_provider: RegionFilePathFactory {
139 region_dir: self.region_dir.clone(),
140 },
141 remote_store: self.object_store.clone(),
142 },
143 write_opts,
144 )
145 .await?
146 } else {
147 let store = self.object_store.clone();
149 let path_provider = RegionFilePathFactory::new(self.region_dir.clone());
150 let indexer_builder = IndexerBuilderImpl {
151 op_type: request.op_type,
152 metadata: request.metadata.clone(),
153 row_group_size: write_opts.row_group_size,
154 puffin_manager: self
155 .puffin_manager_factory
156 .build(store, path_provider.clone()),
157 intermediate_manager: self.intermediate_manager.clone(),
158 index_options: request.index_options,
159 inverted_index_config: request.inverted_index_config,
160 fulltext_index_config: request.fulltext_index_config,
161 bloom_filter_index_config: request.bloom_filter_index_config,
162 };
163 let mut writer = ParquetWriter::new_with_object_store(
164 self.object_store.clone(),
165 request.metadata,
166 indexer_builder,
167 path_provider,
168 )
169 .await;
170 writer
171 .write_all(request.source, request.max_sequence, write_opts)
172 .await?
173 };
174
175 if !sst_info.is_empty() {
177 for sst in &sst_info {
178 if let Some(parquet_metadata) = &sst.file_metadata {
179 cache_manager.put_parquet_meta_data(
180 region_id,
181 sst.file_id,
182 parquet_metadata.clone(),
183 )
184 }
185 }
186 }
187
188 Ok(sst_info)
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
194pub enum OperationType {
195 Flush,
196 Compact,
197}
198
199pub struct SstWriteRequest {
201 pub op_type: OperationType,
202 pub metadata: RegionMetadataRef,
203 pub source: Source,
204 pub cache_manager: CacheManagerRef,
205 #[allow(dead_code)]
206 pub storage: Option<String>,
207 pub max_sequence: Option<SequenceNumber>,
208
209 pub index_options: IndexOptions,
211 pub inverted_index_config: InvertedIndexConfig,
212 pub fulltext_index_config: FulltextIndexConfig,
213 pub bloom_filter_index_config: BloomFilterConfig,
214}
215
216pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
217 let atomic_write_dir = join_dir(root, ".tmp/");
218 clean_dir(&atomic_write_dir).await?;
219
220 let builder = Fs::default().root(root).atomic_write_dir(&atomic_write_dir);
221 let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
222
223 Ok(with_instrument_layers(store, false))
224}
225
226async fn clean_dir(dir: &str) -> Result<()> {
228 if tokio::fs::try_exists(dir)
229 .await
230 .context(CleanDirSnafu { dir })?
231 {
232 tokio::fs::remove_dir_all(dir)
233 .await
234 .context(CleanDirSnafu { dir })?;
235 }
236
237 Ok(())
238}
239
240pub trait FilePathProvider: Send + Sync {
242 fn build_index_file_path(&self, file_id: FileId) -> String;
244
245 fn build_sst_file_path(&self, file_id: FileId) -> String;
247}
248
249#[derive(Clone)]
251pub(crate) struct WriteCachePathProvider {
252 region_id: RegionId,
253 file_cache: FileCacheRef,
254}
255
256impl WriteCachePathProvider {
257 pub fn new(region_id: RegionId, file_cache: FileCacheRef) -> Self {
259 Self {
260 region_id,
261 file_cache,
262 }
263 }
264}
265
266impl FilePathProvider for WriteCachePathProvider {
267 fn build_index_file_path(&self, file_id: FileId) -> String {
268 let puffin_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
269 self.file_cache.cache_file_path(puffin_key)
270 }
271
272 fn build_sst_file_path(&self, file_id: FileId) -> String {
273 let parquet_file_key = IndexKey::new(self.region_id, file_id, FileType::Parquet);
274 self.file_cache.cache_file_path(parquet_file_key)
275 }
276}
277
278#[derive(Clone, Debug)]
280pub(crate) struct RegionFilePathFactory {
281 region_dir: String,
282}
283
284impl RegionFilePathFactory {
285 pub fn new(region_dir: String) -> Self {
287 Self { region_dir }
288 }
289}
290
291impl FilePathProvider for RegionFilePathFactory {
292 fn build_index_file_path(&self, file_id: FileId) -> String {
293 location::index_file_path(&self.region_dir, file_id)
294 }
295
296 fn build_sst_file_path(&self, file_id: FileId) -> String {
297 location::sst_file_path(&self.region_dir, file_id)
298 }
299}