1use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::future::try_join_all;
25use futures::TryStreamExt;
26use lazy_static::lazy_static;
27use object_store::{util, Entry, ErrorKind, Lister, ObjectStore};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::storage::RegionId;
32use store_api::ManifestVersion;
33use tokio::sync::Semaphore;
34
35use crate::error::{
36 ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
37 OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
38};
39
40lazy_static! {
41 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
42 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
43}
44
45const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
46const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
47const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
50const FETCH_MANIFEST_PARALLELISM: usize = 16;
51
52pub const fn manifest_compress_type(compress: bool) -> CompressionType {
54 if compress {
55 DEFAULT_MANIFEST_COMPRESSION_TYPE
56 } else {
57 FALL_BACK_COMPRESS_TYPE
58 }
59}
60
61pub fn delta_file(version: ManifestVersion) -> String {
62 format!("{version:020}.json")
63}
64
65pub fn checkpoint_file(version: ManifestVersion) -> String {
66 format!("{version:020}.checkpoint")
67}
68
69pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
70 if compress_type == CompressionType::Uncompressed {
71 format!("{}{}", path, file)
72 } else {
73 format!("{}{}.{}", path, file, compress_type.file_extension())
74 }
75}
76
77fn checkpoint_checksum(data: &[u8]) -> u32 {
78 let mut hasher = Hasher::new();
79 hasher.update(data);
80 hasher.finalize()
81}
82
83fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
84 if let Some(checksum) = wanted {
85 let calculated_checksum = checkpoint_checksum(data);
86 ensure!(
87 checksum == calculated_checksum,
88 ChecksumMismatchSnafu {
89 actual: calculated_checksum,
90 expected: checksum,
91 }
92 );
93 }
94 Ok(())
95}
96
97pub fn file_version(path: &str) -> ManifestVersion {
102 let s = path.split('.').next().unwrap();
103 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
104}
105
106pub fn file_compress_type(path: &str) -> CompressionType {
111 let s = path.rsplit('.').next().unwrap_or("");
112 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
113}
114
115pub fn is_delta_file(file_name: &str) -> bool {
116 DELTA_RE.is_match(file_name)
117}
118
119pub fn is_checkpoint_file(file_name: &str) -> bool {
120 CHECKPOINT_RE.is_match(file_name)
121}
122
123#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
125enum FileKey {
126 Delta(ManifestVersion),
128 Checkpoint(ManifestVersion),
130}
131
132#[derive(Clone, Debug)]
133pub struct ManifestObjectStore {
134 object_store: ObjectStore,
135 compress_type: CompressionType,
136 path: String,
137 staging_path: String,
138 manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
140 total_manifest_size: Arc<AtomicU64>,
141}
142
143impl ManifestObjectStore {
144 pub fn new(
145 path: &str,
146 object_store: ObjectStore,
147 compress_type: CompressionType,
148 total_manifest_size: Arc<AtomicU64>,
149 ) -> Self {
150 let path = util::normalize_dir(path);
151 let staging_path = {
152 let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
154 util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
155 };
156 Self {
157 object_store,
158 compress_type,
159 path,
160 staging_path,
161 manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
162 total_manifest_size,
163 }
164 }
165
166 fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
168 let base_path = if is_staging {
169 &self.staging_path
170 } else {
171 &self.path
172 };
173 gen_path(base_path, &delta_file(version), self.compress_type)
174 }
175
176 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
178 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
179 }
180
181 pub(crate) fn last_checkpoint_path(&self) -> String {
184 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
185 }
186
187 pub(crate) fn manifest_dir(&self) -> &str {
189 &self.path
190 }
191
192 pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
194 match self.object_store.lister_with(&self.path).await {
195 Ok(streamer) => Ok(Some(streamer)),
196 Err(e) if e.kind() == ErrorKind::NotFound => {
197 debug!("Manifest directory does not exists: {}", self.path);
198 Ok(None)
199 }
200 Err(e) => Err(e).context(OpenDalSnafu)?,
201 }
202 }
203
204 pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
208 where
209 F: Fn(Entry) -> Option<R>,
210 {
211 let Some(streamer) = self.manifest_lister().await? else {
212 return Ok(vec![]);
213 };
214
215 streamer
216 .try_filter_map(|e| async { Ok(filter(e)) })
217 .try_collect::<Vec<_>>()
218 .await
219 .context(OpenDalSnafu)
220 }
221
222 fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
224 entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
225 }
226
227 pub async fn scan(
229 &self,
230 start: ManifestVersion,
231 end: ManifestVersion,
232 ) -> Result<Vec<(ManifestVersion, Entry)>> {
233 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
234
235 let mut entries: Vec<(ManifestVersion, Entry)> = self
236 .get_paths(|entry| {
237 let file_name = entry.name();
238 if is_delta_file(file_name) {
239 let version = file_version(file_name);
240 if start <= version && version < end {
241 return Some((version, entry));
242 }
243 }
244 None
245 })
246 .await?;
247
248 Self::sort_manifests(&mut entries);
249
250 Ok(entries)
251 }
252
253 pub async fn fetch_manifests_strict_from(
257 &self,
258 start_version: ManifestVersion,
259 end_version: ManifestVersion,
260 region_id: RegionId,
261 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
262 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
263 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
264 debug!(
265 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
266 start_version,
267 end_version,
268 start_index,
269 region_id,
270 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
271 );
272 if let Some(start_index) = start_index {
273 Ok(manifests.split_off(start_index))
274 } else {
275 Ok(vec![])
276 }
277 }
278
279 pub async fn fetch_manifests(
284 &self,
285 start_version: ManifestVersion,
286 end_version: ManifestVersion,
287 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
288 let manifests = self.scan(start_version, end_version).await?;
289
290 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
292
293 let tasks = manifests.iter().map(|(v, entry)| async {
294 let _permit = semaphore.acquire().await.unwrap();
296
297 let compress_type = file_compress_type(entry.name());
298 let bytes = self
299 .object_store
300 .read(entry.path())
301 .await
302 .context(OpenDalSnafu)?;
303 let data = compress_type
304 .decode(bytes)
305 .await
306 .context(DecompressObjectSnafu {
307 compress_type,
308 path: entry.path(),
309 })?;
310 Ok((*v, data))
311 });
312
313 try_join_all(tasks).await
314 }
315
316 pub async fn delete_until(
321 &self,
322 end: ManifestVersion,
323 keep_last_checkpoint: bool,
324 ) -> Result<usize> {
325 let entries: Vec<_> = self
327 .get_paths(|entry| {
328 let file_name = entry.name();
329 let is_checkpoint = is_checkpoint_file(file_name);
330 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
331 let version = file_version(file_name);
332 if version < end {
333 return Some((entry, is_checkpoint, version));
334 }
335 }
336 None
337 })
338 .await?;
339 let checkpoint_version = if keep_last_checkpoint {
340 entries
342 .iter()
343 .filter_map(
344 |(_e, is_checkpoint, version)| {
345 if *is_checkpoint {
346 Some(version)
347 } else {
348 None
349 }
350 },
351 )
352 .max()
353 } else {
354 None
355 };
356 let del_entries: Vec<_> = entries
357 .iter()
358 .filter(|(_e, is_checkpoint, version)| {
359 if let Some(max_version) = checkpoint_version {
360 if *is_checkpoint {
361 version < max_version
363 } else {
364 version <= max_version
367 }
368 } else {
369 true
370 }
371 })
372 .collect();
373 let paths = del_entries
374 .iter()
375 .map(|(e, _, _)| e.path().to_string())
376 .collect::<Vec<_>>();
377 let ret = paths.len();
378
379 debug!(
380 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
381 ret,
382 self.path,
383 end,
384 checkpoint_version,
385 paths,
386 );
387
388 self.object_store
389 .delete_iter(paths)
390 .await
391 .context(OpenDalSnafu)?;
392
393 for (_, is_checkpoint, version) in &del_entries {
395 if *is_checkpoint {
396 self.unset_file_size(&FileKey::Checkpoint(*version));
397 } else {
398 self.unset_file_size(&FileKey::Delta(*version));
399 }
400 }
401
402 Ok(ret)
403 }
404
405 pub async fn save(
407 &mut self,
408 version: ManifestVersion,
409 bytes: &[u8],
410 is_staging: bool,
411 ) -> Result<()> {
412 let path = self.delta_file_path(version, is_staging);
413 debug!("Save log to manifest storage, version: {}", version);
414 let data = self
415 .compress_type
416 .encode(bytes)
417 .await
418 .context(CompressObjectSnafu {
419 compress_type: self.compress_type,
420 path: &path,
421 })?;
422 let delta_size = data.len();
423 self.object_store
424 .write(&path, data)
425 .await
426 .context(OpenDalSnafu)?;
427 self.set_delta_file_size(version, delta_size as u64);
428 Ok(())
429 }
430
431 pub(crate) async fn save_checkpoint(
433 &self,
434 version: ManifestVersion,
435 bytes: &[u8],
436 ) -> Result<()> {
437 let path = self.checkpoint_file_path(version);
438 let data = self
439 .compress_type
440 .encode(bytes)
441 .await
442 .context(CompressObjectSnafu {
443 compress_type: self.compress_type,
444 path: &path,
445 })?;
446 let checkpoint_size = data.len();
447 let checksum = checkpoint_checksum(bytes);
448 self.object_store
449 .write(&path, data)
450 .await
451 .context(OpenDalSnafu)?;
452 self.set_checkpoint_file_size(version, checkpoint_size as u64);
453
454 let last_checkpoint_path = self.last_checkpoint_path();
456
457 let checkpoint_metadata = CheckpointMetadata {
458 size: bytes.len(),
459 version,
460 checksum: Some(checksum),
461 extend_metadata: HashMap::new(),
462 };
463
464 debug!(
465 "Save checkpoint in path: {}, metadata: {:?}",
466 last_checkpoint_path, checkpoint_metadata
467 );
468
469 let bytes = checkpoint_metadata.encode()?;
470 self.object_store
471 .write(&last_checkpoint_path, bytes)
472 .await
473 .context(OpenDalSnafu)?;
474
475 Ok(())
476 }
477
478 async fn load_checkpoint(
479 &mut self,
480 metadata: CheckpointMetadata,
481 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
482 let version = metadata.version;
483 let path = self.checkpoint_file_path(version);
484 let checkpoint_data =
487 match self.object_store.read(&path).await {
488 Ok(checkpoint) => {
489 let checkpoint_size = checkpoint.len();
490 let decompress_data = self.compress_type.decode(checkpoint).await.context(
491 DecompressObjectSnafu {
492 compress_type: self.compress_type,
493 path,
494 },
495 )?;
496 verify_checksum(&decompress_data, metadata.checksum)?;
497 self.set_checkpoint_file_size(version, checkpoint_size as u64);
499 Ok(Some(decompress_data))
500 }
501 Err(e) => {
502 if e.kind() == ErrorKind::NotFound {
503 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
504 let fall_back_path = gen_path(
505 &self.path,
506 &checkpoint_file(version),
507 FALL_BACK_COMPRESS_TYPE,
508 );
509 debug!(
510 "Failed to load checkpoint from path: {}, fall back to path: {}",
511 path, fall_back_path
512 );
513 match self.object_store.read(&fall_back_path).await {
514 Ok(checkpoint) => {
515 let checkpoint_size = checkpoint.len();
516 let decompress_data = FALL_BACK_COMPRESS_TYPE
517 .decode(checkpoint)
518 .await
519 .context(DecompressObjectSnafu {
520 compress_type: FALL_BACK_COMPRESS_TYPE,
521 path,
522 })?;
523 verify_checksum(&decompress_data, metadata.checksum)?;
524 self.set_checkpoint_file_size(version, checkpoint_size as u64);
525 Ok(Some(decompress_data))
526 }
527 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
528 Err(e) => Err(e).context(OpenDalSnafu),
529 }
530 } else {
531 Ok(None)
532 }
533 } else {
534 Err(e).context(OpenDalSnafu)
535 }
536 }
537 }?;
538 Ok(checkpoint_data.map(|data| (version, data)))
539 }
540
541 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
544 let last_checkpoint_path = self.last_checkpoint_path();
545 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
546 Ok(data) => data,
547 Err(e) if e.kind() == ErrorKind::NotFound => {
548 return Ok(None);
549 }
550 Err(e) => {
551 return Err(e).context(OpenDalSnafu)?;
552 }
553 };
554
555 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
556
557 debug!(
558 "Load checkpoint in path: {}, metadata: {:?}",
559 last_checkpoint_path, checkpoint_metadata
560 );
561
562 self.load_checkpoint(checkpoint_metadata).await
563 }
564
565 #[cfg(test)]
566 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
567 self.object_store
568 .read(path)
569 .await
570 .context(OpenDalSnafu)
571 .map(|v| v.to_vec())
572 }
573
574 #[cfg(test)]
575 pub async fn write_last_checkpoint(
576 &mut self,
577 version: ManifestVersion,
578 bytes: &[u8],
579 ) -> Result<()> {
580 let path = self.checkpoint_file_path(version);
581 let data = self
582 .compress_type
583 .encode(bytes)
584 .await
585 .context(CompressObjectSnafu {
586 compress_type: self.compress_type,
587 path: &path,
588 })?;
589
590 let checkpoint_size = data.len();
591
592 self.object_store
593 .write(&path, data)
594 .await
595 .context(OpenDalSnafu)?;
596
597 self.set_checkpoint_file_size(version, checkpoint_size as u64);
598
599 let last_checkpoint_path = self.last_checkpoint_path();
600 let checkpoint_metadata = CheckpointMetadata {
601 size: bytes.len(),
602 version,
603 checksum: Some(1218259706),
604 extend_metadata: HashMap::new(),
605 };
606
607 debug!(
608 "Rewrite checkpoint in path: {}, metadata: {:?}",
609 last_checkpoint_path, checkpoint_metadata
610 );
611
612 let bytes = checkpoint_metadata.encode()?;
613
614 self.object_store
616 .write(&last_checkpoint_path, bytes.clone())
617 .await
618 .context(OpenDalSnafu)?;
619 Ok(())
620 }
621
622 pub(crate) fn total_manifest_size(&self) -> u64 {
624 self.manifest_size_map.read().unwrap().values().sum()
625 }
626
627 pub(crate) fn reset_manifest_size(&mut self) {
629 self.manifest_size_map.write().unwrap().clear();
630 self.total_manifest_size.store(0, Ordering::Relaxed);
631 }
632
633 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
635 let mut m = self.manifest_size_map.write().unwrap();
636 m.insert(FileKey::Delta(version), size);
637
638 self.inc_total_manifest_size(size);
639 }
640
641 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
643 let mut m = self.manifest_size_map.write().unwrap();
644 m.insert(FileKey::Checkpoint(version), size);
645
646 self.inc_total_manifest_size(size);
647 }
648
649 fn unset_file_size(&self, key: &FileKey) {
650 let mut m = self.manifest_size_map.write().unwrap();
651 if let Some(val) = m.remove(key) {
652 debug!("Unset file size: {:?}, size: {}", key, val);
653 self.dec_total_manifest_size(val);
654 }
655 }
656
657 fn inc_total_manifest_size(&self, val: u64) {
658 self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
659 }
660
661 fn dec_total_manifest_size(&self, val: u64) {
662 self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
663 }
664}
665
666#[derive(Serialize, Deserialize, Debug)]
667pub(crate) struct CheckpointMetadata {
668 pub size: usize,
669 pub version: ManifestVersion,
671 pub checksum: Option<u32>,
672 pub extend_metadata: HashMap<String, String>,
673}
674
675impl CheckpointMetadata {
676 fn encode(&self) -> Result<Vec<u8>> {
677 Ok(serde_json::to_string(self)
678 .context(SerdeJsonSnafu)?
679 .into_bytes())
680 }
681
682 fn decode(bs: &[u8]) -> Result<Self> {
683 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
684
685 serde_json::from_str(data).context(SerdeJsonSnafu)
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use common_test_util::temp_dir::create_temp_dir;
692 use object_store::services::Fs;
693 use object_store::ObjectStore;
694
695 use super::*;
696
697 fn new_test_manifest_store() -> ManifestObjectStore {
698 common_telemetry::init_default_ut_logging();
699 let tmp_dir = create_temp_dir("test_manifest_log_store");
700 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
701 let object_store = ObjectStore::new(builder).unwrap().finish();
702 ManifestObjectStore::new(
703 "/",
704 object_store,
705 CompressionType::Uncompressed,
706 Default::default(),
707 )
708 }
709
710 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
711 CheckpointMetadata {
712 size: 0,
713 version,
714 checksum: None,
715 extend_metadata: Default::default(),
716 }
717 }
718
719 #[test]
720 fn test_compress_file_path_generation() {
722 let path = "/foo/bar/";
723 let version: ManifestVersion = 0;
724 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
725 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
726 }
727
728 #[tokio::test]
729 async fn test_manifest_log_store_uncompress() {
730 let mut log_store = new_test_manifest_store();
731 log_store.compress_type = CompressionType::Uncompressed;
732 test_manifest_log_store_case(log_store).await;
733 }
734
735 #[tokio::test]
736 async fn test_manifest_log_store_compress() {
737 let mut log_store = new_test_manifest_store();
738 log_store.compress_type = CompressionType::Gzip;
739 test_manifest_log_store_case(log_store).await;
740 }
741
742 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
743 for v in 0..5 {
744 log_store
745 .save(v, format!("hello, {v}").as_bytes(), false)
746 .await
747 .unwrap();
748 }
749
750 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
751 let mut it = manifests.into_iter();
752 for v in 1..4 {
753 let (version, bytes) = it.next().unwrap();
754 assert_eq!(v, version);
755 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
756 }
757 assert!(it.next().is_none());
758
759 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
760 let mut it = manifests.into_iter();
761 for v in 0..5 {
762 let (version, bytes) = it.next().unwrap();
763 assert_eq!(v, version);
764 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
765 }
766 assert!(it.next().is_none());
767
768 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
770 log_store
771 .save_checkpoint(3, "checkpoint".as_bytes())
772 .await
773 .unwrap();
774
775 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
776 assert_eq!(checkpoint, "checkpoint".as_bytes());
777 assert_eq!(3, v);
778
779 let _ = log_store.delete_until(4, true).await.unwrap();
781 let _ = log_store
782 .load_checkpoint(new_checkpoint_metadata_with_version(3))
783 .await
784 .unwrap()
785 .unwrap();
786 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
787 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
788 let mut it = manifests.into_iter();
789
790 let (version, bytes) = it.next().unwrap();
791 assert_eq!(4, version);
792 assert_eq!("hello, 4".as_bytes(), bytes);
793 assert!(it.next().is_none());
794
795 let _ = log_store.delete_until(11, false).await.unwrap();
797 assert!(log_store
798 .load_checkpoint(new_checkpoint_metadata_with_version(3))
799 .await
800 .unwrap()
801 .is_none());
802 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
803 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
804 let mut it = manifests.into_iter();
805
806 assert!(it.next().is_none());
807 }
808
809 #[tokio::test]
810 async fn test_compress_backward_compatible() {
812 let mut log_store = new_test_manifest_store();
813
814 log_store.compress_type = CompressionType::Uncompressed;
816 for v in 0..5 {
817 log_store
818 .save(v, format!("hello, {v}").as_bytes(), false)
819 .await
820 .unwrap();
821 }
822 log_store
823 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
824 .await
825 .unwrap();
826
827 log_store.compress_type = CompressionType::Gzip;
829
830 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
832 assert_eq!(v, 5);
833 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
834
835 for v in 5..10 {
837 log_store
838 .save(v, format!("hello, {v}").as_bytes(), false)
839 .await
840 .unwrap();
841 }
842 log_store
843 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
844 .await
845 .unwrap();
846
847 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
849 let mut it = manifests.into_iter();
850
851 for v in 0..10 {
852 let (version, bytes) = it.next().unwrap();
853 assert_eq!(v, version);
854 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
855 }
856 let (v, checkpoint) = log_store
857 .load_checkpoint(new_checkpoint_metadata_with_version(5))
858 .await
859 .unwrap()
860 .unwrap();
861 assert_eq!(v, 5);
862 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
863 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
864 assert_eq!(v, 10);
865 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
866
867 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
870 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
871 let mut it = manifests.into_iter();
872 assert!(it.next().is_none());
873 }
874
875 #[tokio::test]
876 async fn test_file_version() {
877 let version = file_version("00000000000000000007.checkpoint");
878 assert_eq!(version, 7);
879
880 let name = delta_file(version);
881 assert_eq!(name, "00000000000000000007.json");
882
883 let name = checkpoint_file(version);
884 assert_eq!(name, "00000000000000000007.checkpoint");
885 }
886
887 #[tokio::test]
888 async fn test_uncompressed_manifest_files_size() {
889 let mut log_store = new_test_manifest_store();
890 log_store.compress_type = CompressionType::Uncompressed;
892 for v in 0..5 {
893 log_store
894 .save(v, format!("hello, {v}").as_bytes(), false)
895 .await
896 .unwrap();
897 }
898 log_store
900 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
901 .await
902 .unwrap();
903
904 assert_eq!(log_store.total_manifest_size(), 63);
906
907 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
909
910 assert_eq!(log_store.total_manifest_size(), 39);
912
913 assert_eq!(
915 log_store
916 .delete_until(ManifestVersion::MAX, false)
917 .await
918 .unwrap(),
919 3
920 );
921
922 assert_eq!(log_store.total_manifest_size(), 0);
923 }
924
925 #[tokio::test]
926 async fn test_compressed_manifest_files_size() {
927 let mut log_store = new_test_manifest_store();
928 log_store.compress_type = CompressionType::Gzip;
930 for v in 0..5 {
932 log_store
933 .save(v, format!("hello, {v}").as_bytes(), false)
934 .await
935 .unwrap();
936 }
937 log_store
938 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
939 .await
940 .unwrap();
941
942 assert_eq!(log_store.total_manifest_size(), 181);
944
945 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
947
948 assert_eq!(log_store.total_manifest_size(), 97);
950
951 assert_eq!(
953 log_store
954 .delete_until(ManifestVersion::MAX, false)
955 .await
956 .unwrap(),
957 3
958 );
959
960 assert_eq!(log_store.total_manifest_size(), 0);
961 }
962}