1use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::util::join_dir;
28use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
29use regex::Regex;
30use serde::{Deserialize, Serialize};
31use snafu::{ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::storage::RegionId;
34use tokio::sync::Semaphore;
35
36use crate::cache::manifest_cache::ManifestCache;
37use crate::error::{
38 ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
39 OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
40};
41
42lazy_static! {
43 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
44 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
45}
46
47const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
48const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
49const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
52const FETCH_MANIFEST_PARALLELISM: usize = 16;
53
54pub fn manifest_dir(region_dir: &str) -> String {
56 join_dir(region_dir, "manifest")
57}
58
59pub const fn manifest_compress_type(compress: bool) -> CompressionType {
61 if compress {
62 DEFAULT_MANIFEST_COMPRESSION_TYPE
63 } else {
64 FALL_BACK_COMPRESS_TYPE
65 }
66}
67
68pub fn delta_file(version: ManifestVersion) -> String {
69 format!("{version:020}.json")
70}
71
72pub fn checkpoint_file(version: ManifestVersion) -> String {
73 format!("{version:020}.checkpoint")
74}
75
76pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
77 if compress_type == CompressionType::Uncompressed {
78 format!("{}{}", path, file)
79 } else {
80 format!("{}{}.{}", path, file, compress_type.file_extension())
81 }
82}
83
84fn checkpoint_checksum(data: &[u8]) -> u32 {
85 let mut hasher = Hasher::new();
86 hasher.update(data);
87 hasher.finalize()
88}
89
90fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
91 if let Some(checksum) = wanted {
92 let calculated_checksum = checkpoint_checksum(data);
93 ensure!(
94 checksum == calculated_checksum,
95 ChecksumMismatchSnafu {
96 actual: calculated_checksum,
97 expected: checksum,
98 }
99 );
100 }
101 Ok(())
102}
103
104pub fn file_version(path: &str) -> ManifestVersion {
109 let s = path.split('.').next().unwrap();
110 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
111}
112
113pub fn file_compress_type(path: &str) -> CompressionType {
118 let s = path.rsplit('.').next().unwrap_or("");
119 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
120}
121
122pub fn is_delta_file(file_name: &str) -> bool {
123 DELTA_RE.is_match(file_name)
124}
125
126pub fn is_checkpoint_file(file_name: &str) -> bool {
127 CHECKPOINT_RE.is_match(file_name)
128}
129
130#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
132enum FileKey {
133 Delta(ManifestVersion),
135 Checkpoint(ManifestVersion),
137}
138
139#[derive(Clone, Debug)]
140pub struct ManifestObjectStore {
141 object_store: ObjectStore,
142 compress_type: CompressionType,
143 path: String,
144 staging_path: String,
145 manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
147 total_manifest_size: Arc<AtomicU64>,
148 manifest_cache: Option<ManifestCache>,
150}
151
152impl ManifestObjectStore {
153 pub fn new(
154 path: &str,
155 object_store: ObjectStore,
156 compress_type: CompressionType,
157 total_manifest_size: Arc<AtomicU64>,
158 manifest_cache: Option<ManifestCache>,
159 ) -> Self {
160 let path = util::normalize_dir(path);
161 let staging_path = {
162 let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
164 util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
165 };
166 Self {
167 object_store,
168 compress_type,
169 path,
170 staging_path,
171 manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
172 total_manifest_size,
173 manifest_cache,
174 }
175 }
176
177 fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
179 let base_path = if is_staging {
180 &self.staging_path
181 } else {
182 &self.path
183 };
184 gen_path(base_path, &delta_file(version), self.compress_type)
185 }
186
187 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
189 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
190 }
191
192 pub(crate) fn last_checkpoint_path(&self) -> String {
195 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
196 }
197
198 pub(crate) fn manifest_dir(&self) -> &str {
200 &self.path
201 }
202
203 pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
205 let path = if is_staging {
206 &self.staging_path
207 } else {
208 &self.path
209 };
210 match self.object_store.lister_with(path).await {
211 Ok(streamer) => Ok(Some(streamer)),
212 Err(e) if e.kind() == ErrorKind::NotFound => {
213 debug!("Manifest directory does not exist: {}", path);
214 Ok(None)
215 }
216 Err(e) => Err(e).context(OpenDalSnafu)?,
217 }
218 }
219
220 pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
224 where
225 F: Fn(Entry) -> Option<R>,
226 {
227 let Some(streamer) = self.manifest_lister(is_staging).await? else {
228 return Ok(vec![]);
229 };
230
231 streamer
232 .try_filter_map(|e| async { Ok(filter(e)) })
233 .try_collect::<Vec<_>>()
234 .await
235 .context(OpenDalSnafu)
236 }
237
238 fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
240 entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
241 }
242
243 pub async fn scan(
245 &self,
246 start: ManifestVersion,
247 end: ManifestVersion,
248 ) -> Result<Vec<(ManifestVersion, Entry)>> {
249 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
250
251 let mut entries: Vec<(ManifestVersion, Entry)> = self
252 .get_paths(
253 |entry| {
254 let file_name = entry.name();
255 if is_delta_file(file_name) {
256 let version = file_version(file_name);
257 if start <= version && version < end {
258 return Some((version, entry));
259 }
260 }
261 None
262 },
263 false,
264 )
265 .await?;
266
267 Self::sort_manifests(&mut entries);
268
269 Ok(entries)
270 }
271
272 pub async fn fetch_manifests_strict_from(
276 &self,
277 start_version: ManifestVersion,
278 end_version: ManifestVersion,
279 region_id: RegionId,
280 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
281 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
282 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
283 debug!(
284 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
285 start_version,
286 end_version,
287 start_index,
288 region_id,
289 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
290 );
291 if let Some(start_index) = start_index {
292 Ok(manifests.split_off(start_index))
293 } else {
294 Ok(vec![])
295 }
296 }
297
298 async fn fetch_manifests_from_entries(
301 &self,
302 entries: Vec<(ManifestVersion, Entry)>,
303 is_staging: bool,
304 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
305 if entries.is_empty() {
306 return Ok(vec![]);
307 }
308
309 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
311
312 let tasks = entries.iter().map(|(v, entry)| async {
313 let _permit = semaphore.acquire().await.unwrap();
315
316 let cache_key = entry.path();
317 if let Some(data) = self.get_from_cache(cache_key, is_staging).await {
319 return Ok((*v, data));
320 }
321
322 let compress_type = file_compress_type(entry.name());
324 let bytes = self
325 .object_store
326 .read(entry.path())
327 .await
328 .context(OpenDalSnafu)?;
329 let data = compress_type
330 .decode(bytes)
331 .await
332 .context(DecompressObjectSnafu {
333 compress_type,
334 path: entry.path(),
335 })?;
336
337 self.put_to_cache(cache_key.to_string(), &data, is_staging)
339 .await;
340
341 Ok((*v, data))
342 });
343
344 try_join_all(tasks).await
345 }
346
347 pub async fn fetch_manifests(
352 &self,
353 start_version: ManifestVersion,
354 end_version: ManifestVersion,
355 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
356 let manifests = self.scan(start_version, end_version).await?;
357 self.fetch_manifests_from_entries(manifests, false).await
358 }
359
360 pub async fn delete_until(
365 &self,
366 end: ManifestVersion,
367 keep_last_checkpoint: bool,
368 ) -> Result<usize> {
369 let entries: Vec<_> = self
371 .get_paths(
372 |entry| {
373 let file_name = entry.name();
374 let is_checkpoint = is_checkpoint_file(file_name);
375 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
376 let version = file_version(file_name);
377 if version < end {
378 return Some((entry, is_checkpoint, version));
379 }
380 }
381 None
382 },
383 false,
384 )
385 .await?;
386 let checkpoint_version = if keep_last_checkpoint {
387 entries
389 .iter()
390 .filter_map(
391 |(_e, is_checkpoint, version)| {
392 if *is_checkpoint { Some(version) } else { None }
393 },
394 )
395 .max()
396 } else {
397 None
398 };
399 let del_entries: Vec<_> = entries
400 .iter()
401 .filter(|(_e, is_checkpoint, version)| {
402 if let Some(max_version) = checkpoint_version {
403 if *is_checkpoint {
404 version < max_version
406 } else {
407 version <= max_version
410 }
411 } else {
412 true
413 }
414 })
415 .collect();
416 let paths = del_entries
417 .iter()
418 .map(|(e, _, _)| e.path().to_string())
419 .collect::<Vec<_>>();
420 let ret = paths.len();
421
422 debug!(
423 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
424 ret, self.path, end, checkpoint_version, paths,
425 );
426
427 for (entry, _, _) in &del_entries {
429 self.remove_from_cache(entry.path()).await;
430 }
431
432 self.object_store
433 .delete_iter(paths)
434 .await
435 .context(OpenDalSnafu)?;
436
437 for (_, is_checkpoint, version) in &del_entries {
439 if *is_checkpoint {
440 self.unset_file_size(&FileKey::Checkpoint(*version));
441 } else {
442 self.unset_file_size(&FileKey::Delta(*version));
443 }
444 }
445
446 Ok(ret)
447 }
448
449 pub async fn save(
451 &mut self,
452 version: ManifestVersion,
453 bytes: &[u8],
454 is_staging: bool,
455 ) -> Result<()> {
456 let path = self.delta_file_path(version, is_staging);
457 debug!("Save log to manifest storage, version: {}", version);
458 let data = self
459 .compress_type
460 .encode(bytes)
461 .await
462 .context(CompressObjectSnafu {
463 compress_type: self.compress_type,
464 path: &path,
465 })?;
466 let delta_size = data.len();
467
468 self.write_and_put_cache(&path, data, is_staging).await?;
469 self.set_delta_file_size(version, delta_size as u64);
470
471 Ok(())
472 }
473
474 pub(crate) async fn save_checkpoint(
476 &self,
477 version: ManifestVersion,
478 bytes: &[u8],
479 ) -> Result<()> {
480 let path = self.checkpoint_file_path(version);
481 let data = self
482 .compress_type
483 .encode(bytes)
484 .await
485 .context(CompressObjectSnafu {
486 compress_type: self.compress_type,
487 path: &path,
488 })?;
489 let checkpoint_size = data.len();
490 let checksum = checkpoint_checksum(bytes);
491
492 self.write_and_put_cache(&path, data, false).await?;
493 self.set_checkpoint_file_size(version, checkpoint_size as u64);
494
495 let last_checkpoint_path = self.last_checkpoint_path();
497
498 let checkpoint_metadata = CheckpointMetadata {
499 size: bytes.len(),
500 version,
501 checksum: Some(checksum),
502 extend_metadata: HashMap::new(),
503 };
504
505 debug!(
506 "Save checkpoint in path: {}, metadata: {:?}",
507 last_checkpoint_path, checkpoint_metadata
508 );
509
510 let bytes = checkpoint_metadata.encode()?;
511 self.object_store
512 .write(&last_checkpoint_path, bytes)
513 .await
514 .context(OpenDalSnafu)?;
515
516 Ok(())
517 }
518
519 async fn load_checkpoint(
520 &mut self,
521 metadata: CheckpointMetadata,
522 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
523 let version = metadata.version;
524 let path = self.checkpoint_file_path(version);
525
526 if let Some(data) = self.get_from_cache(&path, false).await {
528 verify_checksum(&data, metadata.checksum)?;
529 return Ok(Some((version, data)));
530 }
531
532 let checkpoint_data = match self.object_store.read(&path).await {
535 Ok(checkpoint) => {
536 let checkpoint_size = checkpoint.len();
537 let decompress_data =
538 self.compress_type
539 .decode(checkpoint)
540 .await
541 .with_context(|_| DecompressObjectSnafu {
542 compress_type: self.compress_type,
543 path: path.clone(),
544 })?;
545 verify_checksum(&decompress_data, metadata.checksum)?;
546 self.set_checkpoint_file_size(version, checkpoint_size as u64);
548 self.put_to_cache(path, &decompress_data, false).await;
550 Ok(Some(decompress_data))
551 }
552 Err(e) => {
553 if e.kind() == ErrorKind::NotFound {
554 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
555 let fall_back_path = gen_path(
556 &self.path,
557 &checkpoint_file(version),
558 FALL_BACK_COMPRESS_TYPE,
559 );
560 debug!(
561 "Failed to load checkpoint from path: {}, fall back to path: {}",
562 path, fall_back_path
563 );
564
565 if let Some(data) = self.get_from_cache(&fall_back_path, false).await {
567 verify_checksum(&data, metadata.checksum)?;
568 return Ok(Some((version, data)));
569 }
570
571 match self.object_store.read(&fall_back_path).await {
572 Ok(checkpoint) => {
573 let checkpoint_size = checkpoint.len();
574 let decompress_data = FALL_BACK_COMPRESS_TYPE
575 .decode(checkpoint)
576 .await
577 .with_context(|_| DecompressObjectSnafu {
578 compress_type: FALL_BACK_COMPRESS_TYPE,
579 path: fall_back_path.clone(),
580 })?;
581 verify_checksum(&decompress_data, metadata.checksum)?;
582 self.set_checkpoint_file_size(version, checkpoint_size as u64);
583 self.put_to_cache(fall_back_path, &decompress_data, false)
585 .await;
586 Ok(Some(decompress_data))
587 }
588 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
589 Err(e) => Err(e).context(OpenDalSnafu),
590 }
591 } else {
592 Ok(None)
593 }
594 } else {
595 Err(e).context(OpenDalSnafu)
596 }
597 }
598 }?;
599 Ok(checkpoint_data.map(|data| (version, data)))
600 }
601
602 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
605 let last_checkpoint_path = self.last_checkpoint_path();
606
607 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
609 Ok(data) => data.to_vec(),
610 Err(e) if e.kind() == ErrorKind::NotFound => {
611 return Ok(None);
612 }
613 Err(e) => {
614 return Err(e).context(OpenDalSnafu)?;
615 }
616 };
617
618 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
619
620 debug!(
621 "Load checkpoint in path: {}, metadata: {:?}",
622 last_checkpoint_path, checkpoint_metadata
623 );
624
625 self.load_checkpoint(checkpoint_metadata).await
626 }
627
628 #[cfg(test)]
629 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
630 self.object_store
631 .read(path)
632 .await
633 .context(OpenDalSnafu)
634 .map(|v| v.to_vec())
635 }
636
637 #[cfg(test)]
638 pub async fn write_last_checkpoint(
639 &mut self,
640 version: ManifestVersion,
641 bytes: &[u8],
642 ) -> Result<()> {
643 let path = self.checkpoint_file_path(version);
644 let data = self
645 .compress_type
646 .encode(bytes)
647 .await
648 .context(CompressObjectSnafu {
649 compress_type: self.compress_type,
650 path: &path,
651 })?;
652
653 let checkpoint_size = data.len();
654
655 self.object_store
656 .write(&path, data)
657 .await
658 .context(OpenDalSnafu)?;
659
660 self.set_checkpoint_file_size(version, checkpoint_size as u64);
661
662 let last_checkpoint_path = self.last_checkpoint_path();
663 let checkpoint_metadata = CheckpointMetadata {
664 size: bytes.len(),
665 version,
666 checksum: Some(1218259706),
667 extend_metadata: HashMap::new(),
668 };
669
670 debug!(
671 "Rewrite checkpoint in path: {}, metadata: {:?}",
672 last_checkpoint_path, checkpoint_metadata
673 );
674
675 let bytes = checkpoint_metadata.encode()?;
676
677 self.object_store
679 .write(&last_checkpoint_path, bytes.clone())
680 .await
681 .context(OpenDalSnafu)?;
682 Ok(())
683 }
684
685 pub(crate) fn total_manifest_size(&self) -> u64 {
687 self.manifest_size_map.read().unwrap().values().sum()
688 }
689
690 pub(crate) fn reset_manifest_size(&mut self) {
692 self.manifest_size_map.write().unwrap().clear();
693 self.total_manifest_size.store(0, Ordering::Relaxed);
694 }
695
696 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
698 let mut m = self.manifest_size_map.write().unwrap();
699 m.insert(FileKey::Delta(version), size);
700
701 self.inc_total_manifest_size(size);
702 }
703
704 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
706 let mut m = self.manifest_size_map.write().unwrap();
707 m.insert(FileKey::Checkpoint(version), size);
708
709 self.inc_total_manifest_size(size);
710 }
711
712 fn unset_file_size(&self, key: &FileKey) {
713 let mut m = self.manifest_size_map.write().unwrap();
714 if let Some(val) = m.remove(key) {
715 debug!("Unset file size: {:?}, size: {}", key, val);
716 self.dec_total_manifest_size(val);
717 }
718 }
719
720 fn inc_total_manifest_size(&self, val: u64) {
721 self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
722 }
723
724 fn dec_total_manifest_size(&self, val: u64) {
725 self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
726 }
727
728 pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
730 let manifest_entries = self
731 .get_paths(
732 |entry| {
733 let file_name = entry.name();
734 if is_delta_file(file_name) {
735 let version = file_version(file_name);
736 Some((version, entry))
737 } else {
738 None
739 }
740 },
741 true,
742 )
743 .await?;
744
745 let mut sorted_entries = manifest_entries;
746 Self::sort_manifests(&mut sorted_entries);
747
748 self.fetch_manifests_from_entries(sorted_entries, true)
749 .await
750 }
751
752 pub async fn clear_staging_manifests(&mut self) -> Result<()> {
754 self.object_store
755 .remove_all(&self.staging_path)
756 .await
757 .context(OpenDalSnafu)?;
758
759 debug!(
760 "Cleared all staging manifest files from {}",
761 self.staging_path
762 );
763
764 Ok(())
765 }
766
767 async fn get_from_cache(&self, key: &str, is_staging: bool) -> Option<Vec<u8>> {
771 if is_staging {
772 return None;
773 }
774 let cache = self.manifest_cache.as_ref()?;
775 cache.get_file(key).await
776 }
777
778 async fn put_to_cache(&self, key: String, data: &[u8], is_staging: bool) {
781 if is_staging {
782 return;
783 }
784 let Some(cache) = &self.manifest_cache else {
785 return;
786 };
787
788 cache.put_file(key, data.to_vec()).await;
789 }
790
791 async fn write_and_put_cache(&self, path: &str, data: Vec<u8>, is_staging: bool) -> Result<()> {
794 let cache_data = if !is_staging && self.manifest_cache.is_some() {
796 Some(data.clone())
797 } else {
798 None
799 };
800
801 self.object_store
803 .write(path, data)
804 .await
805 .context(OpenDalSnafu)?;
806
807 if let Some(data) = cache_data {
809 self.put_to_cache(path.to_string(), &data, is_staging).await;
810 }
811
812 Ok(())
813 }
814
815 async fn remove_from_cache(&self, key: &str) {
817 let Some(cache) = &self.manifest_cache else {
818 return;
819 };
820
821 cache.remove(key).await;
822 }
823}
824
825#[derive(Serialize, Deserialize, Debug)]
826pub(crate) struct CheckpointMetadata {
827 pub size: usize,
828 pub version: ManifestVersion,
830 pub checksum: Option<u32>,
831 pub extend_metadata: HashMap<String, String>,
832}
833
834impl CheckpointMetadata {
835 fn encode(&self) -> Result<Vec<u8>> {
836 Ok(serde_json::to_string(self)
837 .context(SerdeJsonSnafu)?
838 .into_bytes())
839 }
840
841 fn decode(bs: &[u8]) -> Result<Self> {
842 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
843
844 serde_json::from_str(data).context(SerdeJsonSnafu)
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use common_test_util::temp_dir::create_temp_dir;
851 use object_store::ObjectStore;
852 use object_store::services::Fs;
853
854 use super::*;
855
856 fn new_test_manifest_store() -> ManifestObjectStore {
857 common_telemetry::init_default_ut_logging();
858 let tmp_dir = create_temp_dir("test_manifest_log_store");
859 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
860 let object_store = ObjectStore::new(builder).unwrap().finish();
861 ManifestObjectStore::new(
862 "/",
863 object_store,
864 CompressionType::Uncompressed,
865 Default::default(),
866 None,
867 )
868 }
869
870 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
871 CheckpointMetadata {
872 size: 0,
873 version,
874 checksum: None,
875 extend_metadata: Default::default(),
876 }
877 }
878
879 #[test]
880 fn test_compress_file_path_generation() {
882 let path = "/foo/bar/";
883 let version: ManifestVersion = 0;
884 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
885 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
886 }
887
888 #[tokio::test]
889 async fn test_manifest_log_store_uncompress() {
890 let mut log_store = new_test_manifest_store();
891 log_store.compress_type = CompressionType::Uncompressed;
892 test_manifest_log_store_case(log_store).await;
893 }
894
895 #[tokio::test]
896 async fn test_manifest_log_store_compress() {
897 let mut log_store = new_test_manifest_store();
898 log_store.compress_type = CompressionType::Gzip;
899 test_manifest_log_store_case(log_store).await;
900 }
901
902 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
903 for v in 0..5 {
904 log_store
905 .save(v, format!("hello, {v}").as_bytes(), false)
906 .await
907 .unwrap();
908 }
909
910 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
911 let mut it = manifests.into_iter();
912 for v in 1..4 {
913 let (version, bytes) = it.next().unwrap();
914 assert_eq!(v, version);
915 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
916 }
917 assert!(it.next().is_none());
918
919 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
920 let mut it = manifests.into_iter();
921 for v in 0..5 {
922 let (version, bytes) = it.next().unwrap();
923 assert_eq!(v, version);
924 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
925 }
926 assert!(it.next().is_none());
927
928 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
930 log_store
931 .save_checkpoint(3, "checkpoint".as_bytes())
932 .await
933 .unwrap();
934
935 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
936 assert_eq!(checkpoint, "checkpoint".as_bytes());
937 assert_eq!(3, v);
938
939 let _ = log_store.delete_until(4, true).await.unwrap();
941 let _ = log_store
942 .load_checkpoint(new_checkpoint_metadata_with_version(3))
943 .await
944 .unwrap()
945 .unwrap();
946 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
947 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
948 let mut it = manifests.into_iter();
949
950 let (version, bytes) = it.next().unwrap();
951 assert_eq!(4, version);
952 assert_eq!("hello, 4".as_bytes(), bytes);
953 assert!(it.next().is_none());
954
955 let _ = log_store.delete_until(11, false).await.unwrap();
957 assert!(
958 log_store
959 .load_checkpoint(new_checkpoint_metadata_with_version(3))
960 .await
961 .unwrap()
962 .is_none()
963 );
964 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
965 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
966 let mut it = manifests.into_iter();
967
968 assert!(it.next().is_none());
969 }
970
971 #[tokio::test]
972 async fn test_compress_backward_compatible() {
974 let mut log_store = new_test_manifest_store();
975
976 log_store.compress_type = CompressionType::Uncompressed;
978 for v in 0..5 {
979 log_store
980 .save(v, format!("hello, {v}").as_bytes(), false)
981 .await
982 .unwrap();
983 }
984 log_store
985 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
986 .await
987 .unwrap();
988
989 log_store.compress_type = CompressionType::Gzip;
991
992 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
994 assert_eq!(v, 5);
995 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
996
997 for v in 5..10 {
999 log_store
1000 .save(v, format!("hello, {v}").as_bytes(), false)
1001 .await
1002 .unwrap();
1003 }
1004 log_store
1005 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
1006 .await
1007 .unwrap();
1008
1009 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1011 let mut it = manifests.into_iter();
1012
1013 for v in 0..10 {
1014 let (version, bytes) = it.next().unwrap();
1015 assert_eq!(v, version);
1016 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
1017 }
1018 let (v, checkpoint) = log_store
1019 .load_checkpoint(new_checkpoint_metadata_with_version(5))
1020 .await
1021 .unwrap()
1022 .unwrap();
1023 assert_eq!(v, 5);
1024 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
1025 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
1026 assert_eq!(v, 10);
1027 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
1028
1029 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
1032 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1033 let mut it = manifests.into_iter();
1034 assert!(it.next().is_none());
1035 }
1036
1037 #[tokio::test]
1038 async fn test_file_version() {
1039 let version = file_version("00000000000000000007.checkpoint");
1040 assert_eq!(version, 7);
1041
1042 let name = delta_file(version);
1043 assert_eq!(name, "00000000000000000007.json");
1044
1045 let name = checkpoint_file(version);
1046 assert_eq!(name, "00000000000000000007.checkpoint");
1047 }
1048
1049 #[tokio::test]
1050 async fn test_uncompressed_manifest_files_size() {
1051 let mut log_store = new_test_manifest_store();
1052 log_store.compress_type = CompressionType::Uncompressed;
1054 for v in 0..5 {
1055 log_store
1056 .save(v, format!("hello, {v}").as_bytes(), false)
1057 .await
1058 .unwrap();
1059 }
1060 log_store
1062 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
1063 .await
1064 .unwrap();
1065
1066 assert_eq!(log_store.total_manifest_size(), 63);
1068
1069 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1071
1072 assert_eq!(log_store.total_manifest_size(), 39);
1074
1075 assert_eq!(
1077 log_store
1078 .delete_until(ManifestVersion::MAX, false)
1079 .await
1080 .unwrap(),
1081 3
1082 );
1083
1084 assert_eq!(log_store.total_manifest_size(), 0);
1085 }
1086
1087 #[tokio::test]
1088 async fn test_compressed_manifest_files_size() {
1089 let mut log_store = new_test_manifest_store();
1090 log_store.compress_type = CompressionType::Gzip;
1092 for v in 0..5 {
1094 log_store
1095 .save(v, format!("hello, {v}").as_bytes(), false)
1096 .await
1097 .unwrap();
1098 }
1099 log_store
1100 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
1101 .await
1102 .unwrap();
1103
1104 assert_eq!(log_store.total_manifest_size(), 181);
1106
1107 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1109
1110 assert_eq!(log_store.total_manifest_size(), 97);
1112
1113 assert_eq!(
1115 log_store
1116 .delete_until(ManifestVersion::MAX, false)
1117 .await
1118 .unwrap(),
1119 3
1120 );
1121
1122 assert_eq!(log_store.total_manifest_size(), 0);
1123 }
1124}