1use std::sync::Arc;
16
17use common_datasource::compression::CompressionType;
18use common_telemetry::debug;
19use futures::TryStreamExt;
20use futures::future::try_join_all;
21use object_store::{Entry, ErrorKind, Lister, ObjectStore};
22use snafu::{ResultExt, ensure};
23use store_api::ManifestVersion;
24use store_api::storage::RegionId;
25use tokio::sync::Semaphore;
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::error::{
29 CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
30};
31use crate::manifest::storage::size_tracker::Tracker;
32use crate::manifest::storage::utils::{
33 get_from_cache, put_to_cache, sort_manifests, write_and_put_cache,
34};
35use crate::manifest::storage::{
36 FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path,
37 is_delta_file, list_start_after,
38};
39
40#[derive(Debug, Clone)]
41pub(crate) struct DeltaStorage<T: Tracker> {
42 object_store: ObjectStore,
43 compress_type: CompressionType,
44 path: String,
45 delta_tracker: Arc<T>,
46 manifest_cache: Option<ManifestCache>,
47}
48
49impl<T: Tracker> DeltaStorage<T> {
50 pub(crate) fn new(
51 path: String,
52 object_store: ObjectStore,
53 compress_type: CompressionType,
54 manifest_cache: Option<ManifestCache>,
55 delta_tracker: Arc<T>,
56 ) -> Self {
57 Self {
58 object_store,
59 compress_type,
60 path,
61 delta_tracker,
62 manifest_cache,
63 }
64 }
65
66 pub(crate) fn path(&self) -> &str {
67 &self.path
68 }
69
70 pub(crate) fn object_store(&self) -> &ObjectStore {
71 &self.object_store
72 }
73
74 fn delta_file_path(&self, version: ManifestVersion) -> String {
75 gen_path(&self.path, &delta_file(version), self.compress_type)
76 }
77
78 pub(crate) async fn manifest_lister(
83 &self,
84 start_after: Option<&str>,
85 ) -> Result<Option<Lister>> {
86 let mut builder = self.object_store.lister_with(&self.path);
87 if let Some(s) = start_after {
88 builder = builder.start_after(s);
89 }
90 match builder.await {
91 Ok(streamer) => Ok(Some(streamer)),
92 Err(e) if e.kind() == ErrorKind::NotFound => {
93 debug!("Manifest directory does not exist: {}", self.path);
94 Ok(None)
95 }
96 Err(e) => Err(e).context(OpenDalSnafu)?,
97 }
98 }
99
100 pub async fn get_paths<F, R>(&self, start_after: Option<&str>, mut filter: F) -> Result<Vec<R>>
107 where
108 F: FnMut(Entry) -> Option<R>,
109 {
110 let Some(streamer) = self.manifest_lister(start_after).await? else {
111 return Ok(vec![]);
112 };
113
114 streamer
115 .try_filter_map(|e| {
116 let result = filter(e);
117 async { Ok(result) }
118 })
119 .try_collect::<Vec<_>>()
120 .await
121 .context(OpenDalSnafu)
122 }
123
124 pub async fn scan(
126 &self,
127 start: ManifestVersion,
128 end: ManifestVersion,
129 ) -> Result<Vec<(ManifestVersion, Entry)>> {
130 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
131
132 let start_after = (start > 0).then(|| list_start_after(&self.path, start));
135 let mut total_paths = 0;
136 let mut entries: Vec<(ManifestVersion, Entry)> = self
137 .get_paths(start_after.as_deref(), |entry| {
138 total_paths += 1;
139 let file_name = entry.name();
140 if is_delta_file(file_name) {
141 let version = file_version(file_name);
142 if start <= version && version < end {
143 return Some((version, entry));
144 }
145 }
146 None
147 })
148 .await?;
149
150 sort_manifests(&mut entries);
151
152 common_telemetry::debug!(
153 "DeltaStorage get paths for {}, start: {}, end: {}, start_after: {:?}, total_paths: {}, entries: {}",
154 self.path,
155 start,
156 end,
157 start_after,
158 total_paths,
159 entries.len()
160 );
161
162 Ok(entries)
163 }
164
165 pub async fn fetch_manifests_strict_from(
169 &self,
170 start_version: ManifestVersion,
171 end_version: ManifestVersion,
172 region_id: RegionId,
173 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
174 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
175 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
176 debug!(
177 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
178 start_version,
179 end_version,
180 start_index,
181 region_id,
182 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
183 );
184 if let Some(start_index) = start_index {
185 Ok(manifests.split_off(start_index))
186 } else {
187 Ok(vec![])
188 }
189 }
190
191 pub(crate) async fn fetch_manifests_from_entries(
193 &self,
194 entries: Vec<(ManifestVersion, Entry)>,
195 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
196 if entries.is_empty() {
197 return Ok(vec![]);
198 }
199
200 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
202
203 let tasks = entries.iter().map(|(v, entry)| async {
204 let _permit = semaphore.acquire().await.unwrap();
206
207 let cache_key = entry.path();
208 if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), cache_key).await {
210 return Ok((*v, data));
211 }
212
213 let compress_type = file_compress_type(entry.name());
215 let bytes = self
216 .object_store
217 .read(entry.path())
218 .await
219 .context(OpenDalSnafu)?;
220 let data = compress_type
221 .decode(bytes)
222 .await
223 .context(DecompressObjectSnafu {
224 compress_type,
225 path: entry.path(),
226 })?;
227
228 put_to_cache(self.manifest_cache.as_ref(), cache_key.to_string(), &data).await;
230
231 Ok((*v, data))
232 });
233
234 try_join_all(tasks).await
235 }
236
237 pub async fn fetch_manifests(
242 &self,
243 start_version: ManifestVersion,
244 end_version: ManifestVersion,
245 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
246 let manifests = self.scan(start_version, end_version).await?;
247 self.fetch_manifests_from_entries(manifests).await
248 }
249
250 pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
252 let path = self.delta_file_path(version);
253 debug!("Save log to manifest storage, version: {}", version);
254 let data = self
255 .compress_type
256 .encode(bytes)
257 .await
258 .context(CompressObjectSnafu {
259 compress_type: self.compress_type,
260 path: &path,
261 })?;
262 let delta_size = data.len();
263
264 write_and_put_cache(
265 &self.object_store,
266 self.manifest_cache.as_ref(),
267 &path,
268 data,
269 )
270 .await?;
271 self.delta_tracker.record(version, delta_size as u64);
272
273 Ok(())
274 }
275}
276
277#[cfg(test)]
278impl<T: Tracker> DeltaStorage<T> {
279 pub fn set_compress_type(&mut self, compress_type: CompressionType) {
280 self.compress_type = compress_type;
281 }
282}