mito2/manifest/storage/
delta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_datasource::compression::CompressionType;
18use common_telemetry::debug;
19use futures::TryStreamExt;
20use futures::future::try_join_all;
21use object_store::{Entry, ErrorKind, Lister, ObjectStore};
22use snafu::{ResultExt, ensure};
23use store_api::ManifestVersion;
24use store_api::storage::RegionId;
25use tokio::sync::Semaphore;
26
27use crate::cache::manifest_cache::ManifestCache;
28use crate::error::{
29    CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
30};
31use crate::manifest::storage::size_tracker::Tracker;
32use crate::manifest::storage::utils::{
33    get_from_cache, put_to_cache, sort_manifests, write_and_put_cache,
34};
35use crate::manifest::storage::{
36    FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path,
37    is_delta_file,
38};
39
40#[derive(Debug, Clone)]
41pub(crate) struct DeltaStorage<T: Tracker> {
42    object_store: ObjectStore,
43    compress_type: CompressionType,
44    path: String,
45    delta_tracker: Arc<T>,
46    manifest_cache: Option<ManifestCache>,
47}
48
49impl<T: Tracker> DeltaStorage<T> {
50    pub(crate) fn new(
51        path: String,
52        object_store: ObjectStore,
53        compress_type: CompressionType,
54        manifest_cache: Option<ManifestCache>,
55        delta_tracker: Arc<T>,
56    ) -> Self {
57        Self {
58            object_store,
59            compress_type,
60            path,
61            delta_tracker,
62            manifest_cache,
63        }
64    }
65
66    pub(crate) fn path(&self) -> &str {
67        &self.path
68    }
69
70    pub(crate) fn object_store(&self) -> &ObjectStore {
71        &self.object_store
72    }
73
74    fn delta_file_path(&self, version: ManifestVersion) -> String {
75        gen_path(&self.path, &delta_file(version), self.compress_type)
76    }
77
78    /// Returns an iterator of manifests from path directory.
79    pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
80        match self.object_store.lister_with(&self.path).await {
81            Ok(streamer) => Ok(Some(streamer)),
82            Err(e) if e.kind() == ErrorKind::NotFound => {
83                debug!("Manifest directory does not exist: {}", self.path);
84                Ok(None)
85            }
86            Err(e) => Err(e).context(OpenDalSnafu)?,
87        }
88    }
89
90    /// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
91    /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
92    /// Return an empty vector when directory is not found.
93    pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
94    where
95        F: Fn(Entry) -> Option<R>,
96    {
97        let Some(streamer) = self.manifest_lister().await? else {
98            return Ok(vec![]);
99        };
100
101        streamer
102            .try_filter_map(|e| async { Ok(filter(e)) })
103            .try_collect::<Vec<_>>()
104            .await
105            .context(OpenDalSnafu)
106    }
107
108    /// Scans the manifest files in the range of [start, end) and return all manifest entries.
109    pub async fn scan(
110        &self,
111        start: ManifestVersion,
112        end: ManifestVersion,
113    ) -> Result<Vec<(ManifestVersion, Entry)>> {
114        ensure!(start <= end, InvalidScanIndexSnafu { start, end });
115
116        let mut entries: Vec<(ManifestVersion, Entry)> = self
117            .get_paths(|entry| {
118                let file_name = entry.name();
119                if is_delta_file(file_name) {
120                    let version = file_version(file_name);
121                    if start <= version && version < end {
122                        return Some((version, entry));
123                    }
124                }
125                None
126            })
127            .await?;
128
129        sort_manifests(&mut entries);
130
131        Ok(entries)
132    }
133
134    /// Fetches manifests in range [start_version, end_version).
135    ///
136    /// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
137    pub async fn fetch_manifests_strict_from(
138        &self,
139        start_version: ManifestVersion,
140        end_version: ManifestVersion,
141        region_id: RegionId,
142    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
143        let mut manifests = self.fetch_manifests(start_version, end_version).await?;
144        let start_index = manifests.iter().position(|(v, _)| *v == start_version);
145        debug!(
146            "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
147            start_version,
148            end_version,
149            start_index,
150            region_id,
151            manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
152        );
153        if let Some(start_index) = start_index {
154            Ok(manifests.split_off(start_index))
155        } else {
156            Ok(vec![])
157        }
158    }
159
160    /// Common implementation for fetching manifests from entries in parallel.
161    pub(crate) async fn fetch_manifests_from_entries(
162        &self,
163        entries: Vec<(ManifestVersion, Entry)>,
164    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
165        if entries.is_empty() {
166            return Ok(vec![]);
167        }
168
169        // TODO(weny): Make it configurable.
170        let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
171
172        let tasks = entries.iter().map(|(v, entry)| async {
173            // Safety: semaphore must exist.
174            let _permit = semaphore.acquire().await.unwrap();
175
176            let cache_key = entry.path();
177            // Try to get from cache first
178            if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), cache_key).await {
179                return Ok((*v, data));
180            }
181
182            // Fetch from remote object store
183            let compress_type = file_compress_type(entry.name());
184            let bytes = self
185                .object_store
186                .read(entry.path())
187                .await
188                .context(OpenDalSnafu)?;
189            let data = compress_type
190                .decode(bytes)
191                .await
192                .context(DecompressObjectSnafu {
193                    compress_type,
194                    path: entry.path(),
195                })?;
196
197            // Add to cache
198            put_to_cache(self.manifest_cache.as_ref(), cache_key.to_string(), &data).await;
199
200            Ok((*v, data))
201        });
202
203        try_join_all(tasks).await
204    }
205
206    /// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
207    ///
208    /// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
209    /// Uses [fetch_manifests_strict_from](DeltaStorage::fetch_manifests_strict_from) to get manifests from the `start_version`.
210    pub async fn fetch_manifests(
211        &self,
212        start_version: ManifestVersion,
213        end_version: ManifestVersion,
214    ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
215        let manifests = self.scan(start_version, end_version).await?;
216        self.fetch_manifests_from_entries(manifests).await
217    }
218
219    /// Save the delta manifest file.
220    pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
221        let path = self.delta_file_path(version);
222        debug!("Save log to manifest storage, version: {}", version);
223        let data = self
224            .compress_type
225            .encode(bytes)
226            .await
227            .context(CompressObjectSnafu {
228                compress_type: self.compress_type,
229                path: &path,
230            })?;
231        let delta_size = data.len();
232
233        write_and_put_cache(
234            &self.object_store,
235            self.manifest_cache.as_ref(),
236            &path,
237            data,
238        )
239        .await?;
240        self.delta_tracker.record(version, delta_size as u64);
241
242        Ok(())
243    }
244}
245
246#[cfg(test)]
247impl<T: Tracker> DeltaStorage<T> {
248    pub fn set_compress_type(&mut self, compress_type: CompressionType) {
249        self.compress_type = compress_type;
250    }
251}