Skip to main content

mito2/manifest/storage/
delta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::compression::CompressionType;
18use common_telemetry::debug;
19use futures::TryStreamExt;
20use futures::future::try_join_all;
21use object_store::{Entry, ErrorKind, Lister, ObjectStore};
22use snafu::{ResultExt, ensure};
23use store_api::ManifestVersion;
24use store_api::storage::RegionId;
25use tokio::sync::Semaphore;
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::error::{
29    CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
30};
31use crate::manifest::storage::size_tracker::Tracker;
32use crate::manifest::storage::utils::{
33    get_from_cache, put_to_cache, sort_manifests, write_and_put_cache,
34};
35use crate::manifest::storage::{
36    FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path,
37    is_delta_file, list_start_after,
38};
39
40#[derive(Debug, Clone)]
41pub(crate) struct DeltaStorage<T: Tracker> {
42    object_store: ObjectStore,
43    compress_type: CompressionType,
44    path: String,
45    delta_tracker: Arc<T>,
46    manifest_cache: Option<ManifestCache>,
47}
48
49impl<T: Tracker> DeltaStorage<T> {
50    pub(crate) fn new(
51        path: String,
52        object_store: ObjectStore,
53        compress_type: CompressionType,
54        manifest_cache: Option<ManifestCache>,
55        delta_tracker: Arc<T>,
56    ) -> Self {
57        Self {
58            object_store,
59            compress_type,
60            path,
61            delta_tracker,
62            manifest_cache,
63        }
64    }
65
66    pub(crate) fn path(&self) -> &str {
67        &self.path
68    }
69
70    pub(crate) fn object_store(&self) -> &ObjectStore {
71        &self.object_store
72    }
73
74    fn delta_file_path(&self, version: ManifestVersion) -> String {
75        gen_path(&self.path, &delta_file(version), self.compress_type)
76    }
77
78    /// Returns an iterator of manifests from path directory.
79    ///
80    /// If `start_after` is `Some`, the lister will skip entries whose name is
81    /// lexicographically less than or equal to it (see OpenDAL's `start_after`).
82    pub(crate) async fn manifest_lister(
83        &self,
84        start_after: Option<&str>,
85    ) -> Result<Option<Lister>> {
86        let mut builder = self.object_store.lister_with(&self.path);
87        if let Some(s) = start_after {
88            builder = builder.start_after(s);
89        }
90        match builder.await {
91            Ok(streamer) => Ok(Some(streamer)),
92            Err(e) if e.kind() == ErrorKind::NotFound => {
93                debug!("Manifest directory does not exist: {}", self.path);
94                Ok(None)
95            }
96            Err(e) => Err(e).context(OpenDalSnafu)?,
97        }
98    }
99
100    /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
101    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
102    /// Return an empty vector when directory is not found.
103    ///
104    /// `start_after` is forwarded to the underlying lister to skip entries
105    /// whose name is lexicographically less than or equal to it.
106    pub async fn get_paths<F, R>(&self, start_after: Option<&str>, mut filter: F) -> Result<Vec<R>>
107    where
108        F: FnMut(Entry) -> Option<R>,
109    {
110        let Some(streamer) = self.manifest_lister(start_after).await? else {
111            return Ok(vec![]);
112        };
113
114        streamer
115            .try_filter_map(|e| {
116                let result = filter(e);
117                async { Ok(result) }
118            })
119            .try_collect::<Vec<_>>()
120            .await
121            .context(OpenDalSnafu)
122    }
123
124    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
125    pub async fn scan(
126        &self,
127        start: ManifestVersion,
128        end: ManifestVersion,
129    ) -> Result<Vec<(ManifestVersion, Entry)>> {
130        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
131
132        // Push the version lower bound into the list request via
133        // `list_start_after`; skip the hint when `start == 0` (nothing to skip).
134        let start_after = (start > 0).then(|| list_start_after(&self.path, start));
135        let mut total_paths = 0;
136        let mut entries: Vec<(ManifestVersion, Entry)> = self
137            .get_paths(start_after.as_deref(), |entry| {
138                total_paths += 1;
139                let file_name = entry.name();
140                if is_delta_file(file_name) {
141                    let version = file_version(file_name);
142                    if start <= version && version < end {
143                        return Some((version, entry));
144                    }
145                }
146                None
147            })
148            .await?;
149
150        sort_manifests(&mut entries);
151
152        common_telemetry::debug!(
153            "DeltaStorage get paths for {}, start: {}, end: {}, start_after: {:?}, total_paths: {}, entries: {}",
154            self.path,
155            start,
156            end,
157            start_after,
158            total_paths,
159            entries.len()
160        );
161
162        Ok(entries)
163    }
164
165    /// Fetches manifests in range [start_version, end_version).
166    ///
167    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
168    pub async fn fetch_manifests_strict_from(
169        &self,
170        start_version: ManifestVersion,
171        end_version: ManifestVersion,
172        region_id: RegionId,
173    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
174        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
175        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
176        debug!(
177            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
178            start_version,
179            end_version,
180            start_index,
181            region_id,
182            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
183        );
184        if let Some(start_index) = start_index {
185            Ok(manifests.split_off(start_index))
186        } else {
187            Ok(vec![])
188        }
189    }
190
191    /// Common implementation for fetching manifests from entries in parallel.
192    pub(crate) async fn fetch_manifests_from_entries(
193        &self,
194        entries: Vec<(ManifestVersion, Entry)>,
195    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
196        if entries.is_empty() {
197            return Ok(vec![]);
198        }
199
200        // TODO(weny): Make it configurable.
201        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
202
203        let tasks = entries.iter().map(|(v, entry)| async {
204            // Safety: semaphore must exist.
205            let _permit = semaphore.acquire().await.unwrap();
206
207            let cache_key = entry.path();
208            // Try to get from cache first
209            if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), cache_key).await {
210                return Ok((*v, data));
211            }
212
213            // Fetch from remote object store
214            let compress_type = file_compress_type(entry.name());
215            let bytes = self
216                .object_store
217                .read(entry.path())
218                .await
219                .context(OpenDalSnafu)?;
220            let data = compress_type
221                .decode(bytes)
222                .await
223                .context(DecompressObjectSnafu {
224                    compress_type,
225                    path: entry.path(),
226                })?;
227
228            // Add to cache
229            put_to_cache(self.manifest_cache.as_ref(), cache_key.to_string(), &data).await;
230
231            Ok((*v, data))
232        });
233
234        try_join_all(tasks).await
235    }
236
237    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
238    ///
239    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
240    /// Uses [fetch_manifests_strict_from](DeltaStorage::fetch_manifests_strict_from) to get manifests from the `start_version`.
241    pub async fn fetch_manifests(
242        &self,
243        start_version: ManifestVersion,
244        end_version: ManifestVersion,
245    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
246        let manifests = self.scan(start_version, end_version).await?;
247        self.fetch_manifests_from_entries(manifests).await
248    }
249
250    /// Save the delta manifest file.
251    pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
252        let path = self.delta_file_path(version);
253        debug!("Save log to manifest storage, version: {}", version);
254        let data = self
255            .compress_type
256            .encode(bytes)
257            .await
258            .context(CompressObjectSnafu {
259                compress_type: self.compress_type,
260                path: &path,
261            })?;
262        let delta_size = data.len();
263
264        write_and_put_cache(
265            &self.object_store,
266            self.manifest_cache.as_ref(),
267            &path,
268            data,
269        )
270        .await?;
271        self.delta_tracker.record(version, delta_size as u64);
272
273        Ok(())
274    }
275}
276
277#[cfg(test)]
278impl<T: Tracker> DeltaStorage<T> {
279    pub fn set_compress_type(&mut self, compress_type: CompressionType) {
280        self.compress_type = compress_type;
281    }
282}