1use std::sync::Arc;
16
17use common_datasource::compression::CompressionType;
18use common_telemetry::debug;
19use futures::TryStreamExt;
20use futures::future::try_join_all;
21use object_store::{Entry, ErrorKind, Lister, ObjectStore};
22use snafu::{ResultExt, ensure};
23use store_api::ManifestVersion;
24use store_api::storage::RegionId;
25use tokio::sync::Semaphore;
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::error::{
29 CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
30};
31use crate::manifest::storage::size_tracker::Tracker;
32use crate::manifest::storage::utils::{
33 get_from_cache, put_to_cache, sort_manifests, write_and_put_cache,
34};
35use crate::manifest::storage::{
36 FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path,
37 is_delta_file,
38};
39
40#[derive(Debug, Clone)]
41pub(crate) struct DeltaStorage<T: Tracker> {
42 object_store: ObjectStore,
43 compress_type: CompressionType,
44 path: String,
45 delta_tracker: Arc<T>,
46 manifest_cache: Option<ManifestCache>,
47}
48
49impl<T: Tracker> DeltaStorage<T> {
50 pub(crate) fn new(
51 path: String,
52 object_store: ObjectStore,
53 compress_type: CompressionType,
54 manifest_cache: Option<ManifestCache>,
55 delta_tracker: Arc<T>,
56 ) -> Self {
57 Self {
58 object_store,
59 compress_type,
60 path,
61 delta_tracker,
62 manifest_cache,
63 }
64 }
65
66 pub(crate) fn path(&self) -> &str {
67 &self.path
68 }
69
70 pub(crate) fn object_store(&self) -> &ObjectStore {
71 &self.object_store
72 }
73
74 fn delta_file_path(&self, version: ManifestVersion) -> String {
75 gen_path(&self.path, &delta_file(version), self.compress_type)
76 }
77
78 pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
80 match self.object_store.lister_with(&self.path).await {
81 Ok(streamer) => Ok(Some(streamer)),
82 Err(e) if e.kind() == ErrorKind::NotFound => {
83 debug!("Manifest directory does not exist: {}", self.path);
84 Ok(None)
85 }
86 Err(e) => Err(e).context(OpenDalSnafu)?,
87 }
88 }
89
90 pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
94 where
95 F: Fn(Entry) -> Option<R>,
96 {
97 let Some(streamer) = self.manifest_lister().await? else {
98 return Ok(vec![]);
99 };
100
101 streamer
102 .try_filter_map(|e| async { Ok(filter(e)) })
103 .try_collect::<Vec<_>>()
104 .await
105 .context(OpenDalSnafu)
106 }
107
108 pub async fn scan(
110 &self,
111 start: ManifestVersion,
112 end: ManifestVersion,
113 ) -> Result<Vec<(ManifestVersion, Entry)>> {
114 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
115
116 let mut entries: Vec<(ManifestVersion, Entry)> = self
117 .get_paths(|entry| {
118 let file_name = entry.name();
119 if is_delta_file(file_name) {
120 let version = file_version(file_name);
121 if start <= version && version < end {
122 return Some((version, entry));
123 }
124 }
125 None
126 })
127 .await?;
128
129 sort_manifests(&mut entries);
130
131 Ok(entries)
132 }
133
134 pub async fn fetch_manifests_strict_from(
138 &self,
139 start_version: ManifestVersion,
140 end_version: ManifestVersion,
141 region_id: RegionId,
142 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
143 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
144 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
145 debug!(
146 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
147 start_version,
148 end_version,
149 start_index,
150 region_id,
151 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
152 );
153 if let Some(start_index) = start_index {
154 Ok(manifests.split_off(start_index))
155 } else {
156 Ok(vec![])
157 }
158 }
159
160 pub(crate) async fn fetch_manifests_from_entries(
162 &self,
163 entries: Vec<(ManifestVersion, Entry)>,
164 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
165 if entries.is_empty() {
166 return Ok(vec![]);
167 }
168
169 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
171
172 let tasks = entries.iter().map(|(v, entry)| async {
173 let _permit = semaphore.acquire().await.unwrap();
175
176 let cache_key = entry.path();
177 if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), cache_key).await {
179 return Ok((*v, data));
180 }
181
182 let compress_type = file_compress_type(entry.name());
184 let bytes = self
185 .object_store
186 .read(entry.path())
187 .await
188 .context(OpenDalSnafu)?;
189 let data = compress_type
190 .decode(bytes)
191 .await
192 .context(DecompressObjectSnafu {
193 compress_type,
194 path: entry.path(),
195 })?;
196
197 put_to_cache(self.manifest_cache.as_ref(), cache_key.to_string(), &data).await;
199
200 Ok((*v, data))
201 });
202
203 try_join_all(tasks).await
204 }
205
206 pub async fn fetch_manifests(
211 &self,
212 start_version: ManifestVersion,
213 end_version: ManifestVersion,
214 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
215 let manifests = self.scan(start_version, end_version).await?;
216 self.fetch_manifests_from_entries(manifests).await
217 }
218
219 pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
221 let path = self.delta_file_path(version);
222 debug!("Save log to manifest storage, version: {}", version);
223 let data = self
224 .compress_type
225 .encode(bytes)
226 .await
227 .context(CompressObjectSnafu {
228 compress_type: self.compress_type,
229 path: &path,
230 })?;
231 let delta_size = data.len();
232
233 write_and_put_cache(
234 &self.object_store,
235 self.manifest_cache.as_ref(),
236 &path,
237 data,
238 )
239 .await?;
240 self.delta_tracker.record(version, delta_size as u64);
241
242 Ok(())
243 }
244}
245
246#[cfg(test)]
247impl<T: Tracker> DeltaStorage<T> {
248 pub fn set_compress_type(&mut self, compress_type: CompressionType) {
249 self.compress_type = compress_type;
250 }
251}