1use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::util::join_dir;
28use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
29use regex::Regex;
30use serde::{Deserialize, Serialize};
31use snafu::{ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::storage::RegionId;
34use tokio::sync::Semaphore;
35
36use crate::error::{
37 ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
38 OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
39};
40
41lazy_static! {
42 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
43 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
44}
45
46const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
47const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
48const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
51const FETCH_MANIFEST_PARALLELISM: usize = 16;
52
53pub fn manifest_dir(region_dir: &str) -> String {
55 join_dir(region_dir, "manifest")
56}
57
58pub const fn manifest_compress_type(compress: bool) -> CompressionType {
60 if compress {
61 DEFAULT_MANIFEST_COMPRESSION_TYPE
62 } else {
63 FALL_BACK_COMPRESS_TYPE
64 }
65}
66
67pub fn delta_file(version: ManifestVersion) -> String {
68 format!("{version:020}.json")
69}
70
71pub fn checkpoint_file(version: ManifestVersion) -> String {
72 format!("{version:020}.checkpoint")
73}
74
75pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
76 if compress_type == CompressionType::Uncompressed {
77 format!("{}{}", path, file)
78 } else {
79 format!("{}{}.{}", path, file, compress_type.file_extension())
80 }
81}
82
83fn checkpoint_checksum(data: &[u8]) -> u32 {
84 let mut hasher = Hasher::new();
85 hasher.update(data);
86 hasher.finalize()
87}
88
89fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
90 if let Some(checksum) = wanted {
91 let calculated_checksum = checkpoint_checksum(data);
92 ensure!(
93 checksum == calculated_checksum,
94 ChecksumMismatchSnafu {
95 actual: calculated_checksum,
96 expected: checksum,
97 }
98 );
99 }
100 Ok(())
101}
102
103pub fn file_version(path: &str) -> ManifestVersion {
108 let s = path.split('.').next().unwrap();
109 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
110}
111
112pub fn file_compress_type(path: &str) -> CompressionType {
117 let s = path.rsplit('.').next().unwrap_or("");
118 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
119}
120
121pub fn is_delta_file(file_name: &str) -> bool {
122 DELTA_RE.is_match(file_name)
123}
124
125pub fn is_checkpoint_file(file_name: &str) -> bool {
126 CHECKPOINT_RE.is_match(file_name)
127}
128
129#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
131enum FileKey {
132 Delta(ManifestVersion),
134 Checkpoint(ManifestVersion),
136}
137
138#[derive(Clone, Debug)]
139pub struct ManifestObjectStore {
140 object_store: ObjectStore,
141 compress_type: CompressionType,
142 path: String,
143 staging_path: String,
144 manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
146 total_manifest_size: Arc<AtomicU64>,
147}
148
149impl ManifestObjectStore {
150 pub fn new(
151 path: &str,
152 object_store: ObjectStore,
153 compress_type: CompressionType,
154 total_manifest_size: Arc<AtomicU64>,
155 ) -> Self {
156 let path = util::normalize_dir(path);
157 let staging_path = {
158 let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
160 util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
161 };
162 Self {
163 object_store,
164 compress_type,
165 path,
166 staging_path,
167 manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
168 total_manifest_size,
169 }
170 }
171
172 fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
174 let base_path = if is_staging {
175 &self.staging_path
176 } else {
177 &self.path
178 };
179 gen_path(base_path, &delta_file(version), self.compress_type)
180 }
181
182 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
184 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
185 }
186
187 pub(crate) fn last_checkpoint_path(&self) -> String {
190 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
191 }
192
193 pub(crate) fn manifest_dir(&self) -> &str {
195 &self.path
196 }
197
198 pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
200 let path = if is_staging {
201 &self.staging_path
202 } else {
203 &self.path
204 };
205 match self.object_store.lister_with(path).await {
206 Ok(streamer) => Ok(Some(streamer)),
207 Err(e) if e.kind() == ErrorKind::NotFound => {
208 debug!("Manifest directory does not exist: {}", path);
209 Ok(None)
210 }
211 Err(e) => Err(e).context(OpenDalSnafu)?,
212 }
213 }
214
215 pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
219 where
220 F: Fn(Entry) -> Option<R>,
221 {
222 let Some(streamer) = self.manifest_lister(is_staging).await? else {
223 return Ok(vec![]);
224 };
225
226 streamer
227 .try_filter_map(|e| async { Ok(filter(e)) })
228 .try_collect::<Vec<_>>()
229 .await
230 .context(OpenDalSnafu)
231 }
232
233 fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
235 entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
236 }
237
238 pub async fn scan(
240 &self,
241 start: ManifestVersion,
242 end: ManifestVersion,
243 ) -> Result<Vec<(ManifestVersion, Entry)>> {
244 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
245
246 let mut entries: Vec<(ManifestVersion, Entry)> = self
247 .get_paths(
248 |entry| {
249 let file_name = entry.name();
250 if is_delta_file(file_name) {
251 let version = file_version(file_name);
252 if start <= version && version < end {
253 return Some((version, entry));
254 }
255 }
256 None
257 },
258 false,
259 )
260 .await?;
261
262 Self::sort_manifests(&mut entries);
263
264 Ok(entries)
265 }
266
267 pub async fn fetch_manifests_strict_from(
271 &self,
272 start_version: ManifestVersion,
273 end_version: ManifestVersion,
274 region_id: RegionId,
275 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
276 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
277 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
278 debug!(
279 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
280 start_version,
281 end_version,
282 start_index,
283 region_id,
284 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
285 );
286 if let Some(start_index) = start_index {
287 Ok(manifests.split_off(start_index))
288 } else {
289 Ok(vec![])
290 }
291 }
292
293 async fn fetch_manifests_from_entries(
295 &self,
296 entries: Vec<(ManifestVersion, Entry)>,
297 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
298 if entries.is_empty() {
299 return Ok(vec![]);
300 }
301
302 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
304
305 let tasks = entries.iter().map(|(v, entry)| async {
306 let _permit = semaphore.acquire().await.unwrap();
308
309 let compress_type = file_compress_type(entry.name());
310 let bytes = self
311 .object_store
312 .read(entry.path())
313 .await
314 .context(OpenDalSnafu)?;
315 let data = compress_type
316 .decode(bytes)
317 .await
318 .context(DecompressObjectSnafu {
319 compress_type,
320 path: entry.path(),
321 })?;
322 Ok((*v, data))
323 });
324
325 try_join_all(tasks).await
326 }
327
328 pub async fn fetch_manifests(
333 &self,
334 start_version: ManifestVersion,
335 end_version: ManifestVersion,
336 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
337 let manifests = self.scan(start_version, end_version).await?;
338 self.fetch_manifests_from_entries(manifests).await
339 }
340
341 pub async fn delete_until(
346 &self,
347 end: ManifestVersion,
348 keep_last_checkpoint: bool,
349 ) -> Result<usize> {
350 let entries: Vec<_> = self
352 .get_paths(
353 |entry| {
354 let file_name = entry.name();
355 let is_checkpoint = is_checkpoint_file(file_name);
356 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
357 let version = file_version(file_name);
358 if version < end {
359 return Some((entry, is_checkpoint, version));
360 }
361 }
362 None
363 },
364 false,
365 )
366 .await?;
367 let checkpoint_version = if keep_last_checkpoint {
368 entries
370 .iter()
371 .filter_map(
372 |(_e, is_checkpoint, version)| {
373 if *is_checkpoint { Some(version) } else { None }
374 },
375 )
376 .max()
377 } else {
378 None
379 };
380 let del_entries: Vec<_> = entries
381 .iter()
382 .filter(|(_e, is_checkpoint, version)| {
383 if let Some(max_version) = checkpoint_version {
384 if *is_checkpoint {
385 version < max_version
387 } else {
388 version <= max_version
391 }
392 } else {
393 true
394 }
395 })
396 .collect();
397 let paths = del_entries
398 .iter()
399 .map(|(e, _, _)| e.path().to_string())
400 .collect::<Vec<_>>();
401 let ret = paths.len();
402
403 debug!(
404 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
405 ret, self.path, end, checkpoint_version, paths,
406 );
407
408 self.object_store
409 .delete_iter(paths)
410 .await
411 .context(OpenDalSnafu)?;
412
413 for (_, is_checkpoint, version) in &del_entries {
415 if *is_checkpoint {
416 self.unset_file_size(&FileKey::Checkpoint(*version));
417 } else {
418 self.unset_file_size(&FileKey::Delta(*version));
419 }
420 }
421
422 Ok(ret)
423 }
424
425 pub async fn save(
427 &mut self,
428 version: ManifestVersion,
429 bytes: &[u8],
430 is_staging: bool,
431 ) -> Result<()> {
432 let path = self.delta_file_path(version, is_staging);
433 debug!("Save log to manifest storage, version: {}", version);
434 let data = self
435 .compress_type
436 .encode(bytes)
437 .await
438 .context(CompressObjectSnafu {
439 compress_type: self.compress_type,
440 path: &path,
441 })?;
442 let delta_size = data.len();
443 self.object_store
444 .write(&path, data)
445 .await
446 .context(OpenDalSnafu)?;
447 self.set_delta_file_size(version, delta_size as u64);
448 Ok(())
449 }
450
451 pub(crate) async fn save_checkpoint(
453 &self,
454 version: ManifestVersion,
455 bytes: &[u8],
456 ) -> Result<()> {
457 let path = self.checkpoint_file_path(version);
458 let data = self
459 .compress_type
460 .encode(bytes)
461 .await
462 .context(CompressObjectSnafu {
463 compress_type: self.compress_type,
464 path: &path,
465 })?;
466 let checkpoint_size = data.len();
467 let checksum = checkpoint_checksum(bytes);
468 self.object_store
469 .write(&path, data)
470 .await
471 .context(OpenDalSnafu)?;
472 self.set_checkpoint_file_size(version, checkpoint_size as u64);
473
474 let last_checkpoint_path = self.last_checkpoint_path();
476
477 let checkpoint_metadata = CheckpointMetadata {
478 size: bytes.len(),
479 version,
480 checksum: Some(checksum),
481 extend_metadata: HashMap::new(),
482 };
483
484 debug!(
485 "Save checkpoint in path: {}, metadata: {:?}",
486 last_checkpoint_path, checkpoint_metadata
487 );
488
489 let bytes = checkpoint_metadata.encode()?;
490 self.object_store
491 .write(&last_checkpoint_path, bytes)
492 .await
493 .context(OpenDalSnafu)?;
494
495 Ok(())
496 }
497
498 async fn load_checkpoint(
499 &mut self,
500 metadata: CheckpointMetadata,
501 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
502 let version = metadata.version;
503 let path = self.checkpoint_file_path(version);
504 let checkpoint_data =
507 match self.object_store.read(&path).await {
508 Ok(checkpoint) => {
509 let checkpoint_size = checkpoint.len();
510 let decompress_data = self.compress_type.decode(checkpoint).await.context(
511 DecompressObjectSnafu {
512 compress_type: self.compress_type,
513 path,
514 },
515 )?;
516 verify_checksum(&decompress_data, metadata.checksum)?;
517 self.set_checkpoint_file_size(version, checkpoint_size as u64);
519 Ok(Some(decompress_data))
520 }
521 Err(e) => {
522 if e.kind() == ErrorKind::NotFound {
523 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
524 let fall_back_path = gen_path(
525 &self.path,
526 &checkpoint_file(version),
527 FALL_BACK_COMPRESS_TYPE,
528 );
529 debug!(
530 "Failed to load checkpoint from path: {}, fall back to path: {}",
531 path, fall_back_path
532 );
533 match self.object_store.read(&fall_back_path).await {
534 Ok(checkpoint) => {
535 let checkpoint_size = checkpoint.len();
536 let decompress_data = FALL_BACK_COMPRESS_TYPE
537 .decode(checkpoint)
538 .await
539 .context(DecompressObjectSnafu {
540 compress_type: FALL_BACK_COMPRESS_TYPE,
541 path,
542 })?;
543 verify_checksum(&decompress_data, metadata.checksum)?;
544 self.set_checkpoint_file_size(version, checkpoint_size as u64);
545 Ok(Some(decompress_data))
546 }
547 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
548 Err(e) => Err(e).context(OpenDalSnafu),
549 }
550 } else {
551 Ok(None)
552 }
553 } else {
554 Err(e).context(OpenDalSnafu)
555 }
556 }
557 }?;
558 Ok(checkpoint_data.map(|data| (version, data)))
559 }
560
561 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
564 let last_checkpoint_path = self.last_checkpoint_path();
565 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
566 Ok(data) => data,
567 Err(e) if e.kind() == ErrorKind::NotFound => {
568 return Ok(None);
569 }
570 Err(e) => {
571 return Err(e).context(OpenDalSnafu)?;
572 }
573 };
574
575 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
576
577 debug!(
578 "Load checkpoint in path: {}, metadata: {:?}",
579 last_checkpoint_path, checkpoint_metadata
580 );
581
582 self.load_checkpoint(checkpoint_metadata).await
583 }
584
585 #[cfg(test)]
586 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
587 self.object_store
588 .read(path)
589 .await
590 .context(OpenDalSnafu)
591 .map(|v| v.to_vec())
592 }
593
594 #[cfg(test)]
595 pub async fn write_last_checkpoint(
596 &mut self,
597 version: ManifestVersion,
598 bytes: &[u8],
599 ) -> Result<()> {
600 let path = self.checkpoint_file_path(version);
601 let data = self
602 .compress_type
603 .encode(bytes)
604 .await
605 .context(CompressObjectSnafu {
606 compress_type: self.compress_type,
607 path: &path,
608 })?;
609
610 let checkpoint_size = data.len();
611
612 self.object_store
613 .write(&path, data)
614 .await
615 .context(OpenDalSnafu)?;
616
617 self.set_checkpoint_file_size(version, checkpoint_size as u64);
618
619 let last_checkpoint_path = self.last_checkpoint_path();
620 let checkpoint_metadata = CheckpointMetadata {
621 size: bytes.len(),
622 version,
623 checksum: Some(1218259706),
624 extend_metadata: HashMap::new(),
625 };
626
627 debug!(
628 "Rewrite checkpoint in path: {}, metadata: {:?}",
629 last_checkpoint_path, checkpoint_metadata
630 );
631
632 let bytes = checkpoint_metadata.encode()?;
633
634 self.object_store
636 .write(&last_checkpoint_path, bytes.clone())
637 .await
638 .context(OpenDalSnafu)?;
639 Ok(())
640 }
641
642 pub(crate) fn total_manifest_size(&self) -> u64 {
644 self.manifest_size_map.read().unwrap().values().sum()
645 }
646
647 pub(crate) fn reset_manifest_size(&mut self) {
649 self.manifest_size_map.write().unwrap().clear();
650 self.total_manifest_size.store(0, Ordering::Relaxed);
651 }
652
653 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
655 let mut m = self.manifest_size_map.write().unwrap();
656 m.insert(FileKey::Delta(version), size);
657
658 self.inc_total_manifest_size(size);
659 }
660
661 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
663 let mut m = self.manifest_size_map.write().unwrap();
664 m.insert(FileKey::Checkpoint(version), size);
665
666 self.inc_total_manifest_size(size);
667 }
668
669 fn unset_file_size(&self, key: &FileKey) {
670 let mut m = self.manifest_size_map.write().unwrap();
671 if let Some(val) = m.remove(key) {
672 debug!("Unset file size: {:?}, size: {}", key, val);
673 self.dec_total_manifest_size(val);
674 }
675 }
676
677 fn inc_total_manifest_size(&self, val: u64) {
678 self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
679 }
680
681 fn dec_total_manifest_size(&self, val: u64) {
682 self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
683 }
684
685 pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
687 let manifest_entries = self
688 .get_paths(
689 |entry| {
690 let file_name = entry.name();
691 if is_delta_file(file_name) {
692 let version = file_version(file_name);
693 Some((version, entry))
694 } else {
695 None
696 }
697 },
698 true,
699 )
700 .await?;
701
702 let mut sorted_entries = manifest_entries;
703 Self::sort_manifests(&mut sorted_entries);
704
705 self.fetch_manifests_from_entries(sorted_entries).await
706 }
707
708 pub async fn clear_staging_manifests(&mut self) -> Result<()> {
710 self.object_store
711 .remove_all(&self.staging_path)
712 .await
713 .context(OpenDalSnafu)?;
714
715 debug!(
716 "Cleared all staging manifest files from {}",
717 self.staging_path
718 );
719
720 Ok(())
721 }
722}
723
724#[derive(Serialize, Deserialize, Debug)]
725pub(crate) struct CheckpointMetadata {
726 pub size: usize,
727 pub version: ManifestVersion,
729 pub checksum: Option<u32>,
730 pub extend_metadata: HashMap<String, String>,
731}
732
733impl CheckpointMetadata {
734 fn encode(&self) -> Result<Vec<u8>> {
735 Ok(serde_json::to_string(self)
736 .context(SerdeJsonSnafu)?
737 .into_bytes())
738 }
739
740 fn decode(bs: &[u8]) -> Result<Self> {
741 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
742
743 serde_json::from_str(data).context(SerdeJsonSnafu)
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use common_test_util::temp_dir::create_temp_dir;
750 use object_store::ObjectStore;
751 use object_store::services::Fs;
752
753 use super::*;
754
755 fn new_test_manifest_store() -> ManifestObjectStore {
756 common_telemetry::init_default_ut_logging();
757 let tmp_dir = create_temp_dir("test_manifest_log_store");
758 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
759 let object_store = ObjectStore::new(builder).unwrap().finish();
760 ManifestObjectStore::new(
761 "/",
762 object_store,
763 CompressionType::Uncompressed,
764 Default::default(),
765 )
766 }
767
768 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
769 CheckpointMetadata {
770 size: 0,
771 version,
772 checksum: None,
773 extend_metadata: Default::default(),
774 }
775 }
776
777 #[test]
778 fn test_compress_file_path_generation() {
780 let path = "/foo/bar/";
781 let version: ManifestVersion = 0;
782 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
783 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
784 }
785
786 #[tokio::test]
787 async fn test_manifest_log_store_uncompress() {
788 let mut log_store = new_test_manifest_store();
789 log_store.compress_type = CompressionType::Uncompressed;
790 test_manifest_log_store_case(log_store).await;
791 }
792
793 #[tokio::test]
794 async fn test_manifest_log_store_compress() {
795 let mut log_store = new_test_manifest_store();
796 log_store.compress_type = CompressionType::Gzip;
797 test_manifest_log_store_case(log_store).await;
798 }
799
800 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
801 for v in 0..5 {
802 log_store
803 .save(v, format!("hello, {v}").as_bytes(), false)
804 .await
805 .unwrap();
806 }
807
808 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
809 let mut it = manifests.into_iter();
810 for v in 1..4 {
811 let (version, bytes) = it.next().unwrap();
812 assert_eq!(v, version);
813 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
814 }
815 assert!(it.next().is_none());
816
817 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
818 let mut it = manifests.into_iter();
819 for v in 0..5 {
820 let (version, bytes) = it.next().unwrap();
821 assert_eq!(v, version);
822 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
823 }
824 assert!(it.next().is_none());
825
826 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
828 log_store
829 .save_checkpoint(3, "checkpoint".as_bytes())
830 .await
831 .unwrap();
832
833 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
834 assert_eq!(checkpoint, "checkpoint".as_bytes());
835 assert_eq!(3, v);
836
837 let _ = log_store.delete_until(4, true).await.unwrap();
839 let _ = log_store
840 .load_checkpoint(new_checkpoint_metadata_with_version(3))
841 .await
842 .unwrap()
843 .unwrap();
844 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
845 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
846 let mut it = manifests.into_iter();
847
848 let (version, bytes) = it.next().unwrap();
849 assert_eq!(4, version);
850 assert_eq!("hello, 4".as_bytes(), bytes);
851 assert!(it.next().is_none());
852
853 let _ = log_store.delete_until(11, false).await.unwrap();
855 assert!(
856 log_store
857 .load_checkpoint(new_checkpoint_metadata_with_version(3))
858 .await
859 .unwrap()
860 .is_none()
861 );
862 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
863 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
864 let mut it = manifests.into_iter();
865
866 assert!(it.next().is_none());
867 }
868
869 #[tokio::test]
870 async fn test_compress_backward_compatible() {
872 let mut log_store = new_test_manifest_store();
873
874 log_store.compress_type = CompressionType::Uncompressed;
876 for v in 0..5 {
877 log_store
878 .save(v, format!("hello, {v}").as_bytes(), false)
879 .await
880 .unwrap();
881 }
882 log_store
883 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
884 .await
885 .unwrap();
886
887 log_store.compress_type = CompressionType::Gzip;
889
890 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
892 assert_eq!(v, 5);
893 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
894
895 for v in 5..10 {
897 log_store
898 .save(v, format!("hello, {v}").as_bytes(), false)
899 .await
900 .unwrap();
901 }
902 log_store
903 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
904 .await
905 .unwrap();
906
907 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
909 let mut it = manifests.into_iter();
910
911 for v in 0..10 {
912 let (version, bytes) = it.next().unwrap();
913 assert_eq!(v, version);
914 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
915 }
916 let (v, checkpoint) = log_store
917 .load_checkpoint(new_checkpoint_metadata_with_version(5))
918 .await
919 .unwrap()
920 .unwrap();
921 assert_eq!(v, 5);
922 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
923 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
924 assert_eq!(v, 10);
925 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
926
927 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
930 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
931 let mut it = manifests.into_iter();
932 assert!(it.next().is_none());
933 }
934
935 #[tokio::test]
936 async fn test_file_version() {
937 let version = file_version("00000000000000000007.checkpoint");
938 assert_eq!(version, 7);
939
940 let name = delta_file(version);
941 assert_eq!(name, "00000000000000000007.json");
942
943 let name = checkpoint_file(version);
944 assert_eq!(name, "00000000000000000007.checkpoint");
945 }
946
947 #[tokio::test]
948 async fn test_uncompressed_manifest_files_size() {
949 let mut log_store = new_test_manifest_store();
950 log_store.compress_type = CompressionType::Uncompressed;
952 for v in 0..5 {
953 log_store
954 .save(v, format!("hello, {v}").as_bytes(), false)
955 .await
956 .unwrap();
957 }
958 log_store
960 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
961 .await
962 .unwrap();
963
964 assert_eq!(log_store.total_manifest_size(), 63);
966
967 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
969
970 assert_eq!(log_store.total_manifest_size(), 39);
972
973 assert_eq!(
975 log_store
976 .delete_until(ManifestVersion::MAX, false)
977 .await
978 .unwrap(),
979 3
980 );
981
982 assert_eq!(log_store.total_manifest_size(), 0);
983 }
984
985 #[tokio::test]
986 async fn test_compressed_manifest_files_size() {
987 let mut log_store = new_test_manifest_store();
988 log_store.compress_type = CompressionType::Gzip;
990 for v in 0..5 {
992 log_store
993 .save(v, format!("hello, {v}").as_bytes(), false)
994 .await
995 .unwrap();
996 }
997 log_store
998 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
999 .await
1000 .unwrap();
1001
1002 assert_eq!(log_store.total_manifest_size(), 181);
1004
1005 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1007
1008 assert_eq!(log_store.total_manifest_size(), 97);
1010
1011 assert_eq!(
1013 log_store
1014 .delete_until(ManifestVersion::MAX, false)
1015 .await
1016 .unwrap(),
1017 3
1018 );
1019
1020 assert_eq!(log_store.total_manifest_size(), 0);
1021 }
1022}