1use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::future::try_join_all;
25use futures::TryStreamExt;
26use lazy_static::lazy_static;
27use object_store::{util, Entry, ErrorKind, Lister, ObjectStore};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::manifest::ManifestVersion;
32use store_api::storage::RegionId;
33use tokio::sync::Semaphore;
34
35use crate::error::{
36 ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
37 OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
38};
39
40lazy_static! {
41 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
42 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
43}
44
45const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
46const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
47const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
50const FETCH_MANIFEST_PARALLELISM: usize = 16;
51
52pub const fn manifest_compress_type(compress: bool) -> CompressionType {
54 if compress {
55 DEFAULT_MANIFEST_COMPRESSION_TYPE
56 } else {
57 FALL_BACK_COMPRESS_TYPE
58 }
59}
60
61pub fn delta_file(version: ManifestVersion) -> String {
62 format!("{version:020}.json")
63}
64
65pub fn checkpoint_file(version: ManifestVersion) -> String {
66 format!("{version:020}.checkpoint")
67}
68
69pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
70 if compress_type == CompressionType::Uncompressed {
71 format!("{}{}", path, file)
72 } else {
73 format!("{}{}.{}", path, file, compress_type.file_extension())
74 }
75}
76
77fn checkpoint_checksum(data: &[u8]) -> u32 {
78 let mut hasher = Hasher::new();
79 hasher.update(data);
80 hasher.finalize()
81}
82
83fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
84 if let Some(checksum) = wanted {
85 let calculated_checksum = checkpoint_checksum(data);
86 ensure!(
87 checksum == calculated_checksum,
88 ChecksumMismatchSnafu {
89 actual: calculated_checksum,
90 expected: checksum,
91 }
92 );
93 }
94 Ok(())
95}
96
97pub fn file_version(path: &str) -> ManifestVersion {
102 let s = path.split('.').next().unwrap();
103 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
104}
105
106pub fn file_compress_type(path: &str) -> CompressionType {
111 let s = path.rsplit('.').next().unwrap_or("");
112 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
113}
114
115pub fn is_delta_file(file_name: &str) -> bool {
116 DELTA_RE.is_match(file_name)
117}
118
119pub fn is_checkpoint_file(file_name: &str) -> bool {
120 CHECKPOINT_RE.is_match(file_name)
121}
122
123#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
125enum FileKey {
126 Delta(ManifestVersion),
128 Checkpoint(ManifestVersion),
130}
131
132#[derive(Clone, Debug)]
133pub struct ManifestObjectStore {
134 object_store: ObjectStore,
135 compress_type: CompressionType,
136 path: String,
137 manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
139 total_manifest_size: Arc<AtomicU64>,
140}
141
142impl ManifestObjectStore {
143 pub fn new(
144 path: &str,
145 object_store: ObjectStore,
146 compress_type: CompressionType,
147 total_manifest_size: Arc<AtomicU64>,
148 ) -> Self {
149 Self {
150 object_store,
151 compress_type,
152 path: util::normalize_dir(path),
153 manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
154 total_manifest_size,
155 }
156 }
157
158 fn delta_file_path(&self, version: ManifestVersion) -> String {
160 gen_path(&self.path, &delta_file(version), self.compress_type)
161 }
162
163 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
165 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
166 }
167
168 pub(crate) fn last_checkpoint_path(&self) -> String {
171 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
172 }
173
174 pub(crate) fn manifest_dir(&self) -> &str {
176 &self.path
177 }
178
179 pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
181 match self.object_store.lister_with(&self.path).await {
182 Ok(streamer) => Ok(Some(streamer)),
183 Err(e) if e.kind() == ErrorKind::NotFound => {
184 debug!("Manifest directory does not exists: {}", self.path);
185 Ok(None)
186 }
187 Err(e) => Err(e).context(OpenDalSnafu)?,
188 }
189 }
190
191 pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
195 where
196 F: Fn(Entry) -> Option<R>,
197 {
198 let Some(streamer) = self.manifest_lister().await? else {
199 return Ok(vec![]);
200 };
201
202 streamer
203 .try_filter_map(|e| async { Ok(filter(e)) })
204 .try_collect::<Vec<_>>()
205 .await
206 .context(OpenDalSnafu)
207 }
208
209 fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
211 entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
212 }
213
214 pub async fn scan(
216 &self,
217 start: ManifestVersion,
218 end: ManifestVersion,
219 ) -> Result<Vec<(ManifestVersion, Entry)>> {
220 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
221
222 let mut entries: Vec<(ManifestVersion, Entry)> = self
223 .get_paths(|entry| {
224 let file_name = entry.name();
225 if is_delta_file(file_name) {
226 let version = file_version(file_name);
227 if start <= version && version < end {
228 return Some((version, entry));
229 }
230 }
231 None
232 })
233 .await?;
234
235 Self::sort_manifests(&mut entries);
236
237 Ok(entries)
238 }
239
240 pub async fn fetch_manifests_strict_from(
244 &self,
245 start_version: ManifestVersion,
246 end_version: ManifestVersion,
247 region_id: RegionId,
248 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
249 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
250 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
251 debug!(
252 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
253 start_version,
254 end_version,
255 start_index,
256 region_id,
257 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
258 );
259 if let Some(start_index) = start_index {
260 Ok(manifests.split_off(start_index))
261 } else {
262 Ok(vec![])
263 }
264 }
265
266 pub async fn fetch_manifests(
271 &self,
272 start_version: ManifestVersion,
273 end_version: ManifestVersion,
274 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
275 let manifests = self.scan(start_version, end_version).await?;
276
277 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
279
280 let tasks = manifests.iter().map(|(v, entry)| async {
281 let _permit = semaphore.acquire().await.unwrap();
283
284 let compress_type = file_compress_type(entry.name());
285 let bytes = self
286 .object_store
287 .read(entry.path())
288 .await
289 .context(OpenDalSnafu)?;
290 let data = compress_type
291 .decode(bytes)
292 .await
293 .context(DecompressObjectSnafu {
294 compress_type,
295 path: entry.path(),
296 })?;
297 Ok((*v, data))
298 });
299
300 try_join_all(tasks).await
301 }
302
303 pub async fn delete_until(
308 &self,
309 end: ManifestVersion,
310 keep_last_checkpoint: bool,
311 ) -> Result<usize> {
312 let entries: Vec<_> = self
314 .get_paths(|entry| {
315 let file_name = entry.name();
316 let is_checkpoint = is_checkpoint_file(file_name);
317 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
318 let version = file_version(file_name);
319 if version < end {
320 return Some((entry, is_checkpoint, version));
321 }
322 }
323 None
324 })
325 .await?;
326 let checkpoint_version = if keep_last_checkpoint {
327 entries
329 .iter()
330 .filter_map(
331 |(_e, is_checkpoint, version)| {
332 if *is_checkpoint {
333 Some(version)
334 } else {
335 None
336 }
337 },
338 )
339 .max()
340 } else {
341 None
342 };
343 let del_entries: Vec<_> = entries
344 .iter()
345 .filter(|(_e, is_checkpoint, version)| {
346 if let Some(max_version) = checkpoint_version {
347 if *is_checkpoint {
348 version < max_version
350 } else {
351 version <= max_version
354 }
355 } else {
356 true
357 }
358 })
359 .collect();
360 let paths = del_entries
361 .iter()
362 .map(|(e, _, _)| e.path().to_string())
363 .collect::<Vec<_>>();
364 let ret = paths.len();
365
366 debug!(
367 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
368 ret,
369 self.path,
370 end,
371 checkpoint_version,
372 paths,
373 );
374
375 self.object_store
376 .delete_iter(paths)
377 .await
378 .context(OpenDalSnafu)?;
379
380 for (_, is_checkpoint, version) in &del_entries {
382 if *is_checkpoint {
383 self.unset_file_size(&FileKey::Checkpoint(*version));
384 } else {
385 self.unset_file_size(&FileKey::Delta(*version));
386 }
387 }
388
389 Ok(ret)
390 }
391
392 pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
394 let path = self.delta_file_path(version);
395 debug!("Save log to manifest storage, version: {}", version);
396 let data = self
397 .compress_type
398 .encode(bytes)
399 .await
400 .context(CompressObjectSnafu {
401 compress_type: self.compress_type,
402 path: &path,
403 })?;
404 let delta_size = data.len();
405 self.object_store
406 .write(&path, data)
407 .await
408 .context(OpenDalSnafu)?;
409 self.set_delta_file_size(version, delta_size as u64);
410 Ok(())
411 }
412
413 pub(crate) async fn save_checkpoint(
415 &self,
416 version: ManifestVersion,
417 bytes: &[u8],
418 ) -> Result<()> {
419 let path = self.checkpoint_file_path(version);
420 let data = self
421 .compress_type
422 .encode(bytes)
423 .await
424 .context(CompressObjectSnafu {
425 compress_type: self.compress_type,
426 path: &path,
427 })?;
428 let checkpoint_size = data.len();
429 let checksum = checkpoint_checksum(bytes);
430 self.object_store
431 .write(&path, data)
432 .await
433 .context(OpenDalSnafu)?;
434 self.set_checkpoint_file_size(version, checkpoint_size as u64);
435
436 let last_checkpoint_path = self.last_checkpoint_path();
438
439 let checkpoint_metadata = CheckpointMetadata {
440 size: bytes.len(),
441 version,
442 checksum: Some(checksum),
443 extend_metadata: HashMap::new(),
444 };
445
446 debug!(
447 "Save checkpoint in path: {}, metadata: {:?}",
448 last_checkpoint_path, checkpoint_metadata
449 );
450
451 let bytes = checkpoint_metadata.encode()?;
452 self.object_store
453 .write(&last_checkpoint_path, bytes)
454 .await
455 .context(OpenDalSnafu)?;
456
457 Ok(())
458 }
459
460 async fn load_checkpoint(
461 &mut self,
462 metadata: CheckpointMetadata,
463 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
464 let version = metadata.version;
465 let path = self.checkpoint_file_path(version);
466 let checkpoint_data =
469 match self.object_store.read(&path).await {
470 Ok(checkpoint) => {
471 let checkpoint_size = checkpoint.len();
472 let decompress_data = self.compress_type.decode(checkpoint).await.context(
473 DecompressObjectSnafu {
474 compress_type: self.compress_type,
475 path,
476 },
477 )?;
478 verify_checksum(&decompress_data, metadata.checksum)?;
479 self.set_checkpoint_file_size(version, checkpoint_size as u64);
481 Ok(Some(decompress_data))
482 }
483 Err(e) => {
484 if e.kind() == ErrorKind::NotFound {
485 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
486 let fall_back_path = gen_path(
487 &self.path,
488 &checkpoint_file(version),
489 FALL_BACK_COMPRESS_TYPE,
490 );
491 debug!(
492 "Failed to load checkpoint from path: {}, fall back to path: {}",
493 path, fall_back_path
494 );
495 match self.object_store.read(&fall_back_path).await {
496 Ok(checkpoint) => {
497 let checkpoint_size = checkpoint.len();
498 let decompress_data = FALL_BACK_COMPRESS_TYPE
499 .decode(checkpoint)
500 .await
501 .context(DecompressObjectSnafu {
502 compress_type: FALL_BACK_COMPRESS_TYPE,
503 path,
504 })?;
505 verify_checksum(&decompress_data, metadata.checksum)?;
506 self.set_checkpoint_file_size(version, checkpoint_size as u64);
507 Ok(Some(decompress_data))
508 }
509 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
510 Err(e) => Err(e).context(OpenDalSnafu),
511 }
512 } else {
513 Ok(None)
514 }
515 } else {
516 Err(e).context(OpenDalSnafu)
517 }
518 }
519 }?;
520 Ok(checkpoint_data.map(|data| (version, data)))
521 }
522
523 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
526 let last_checkpoint_path = self.last_checkpoint_path();
527 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
528 Ok(data) => data,
529 Err(e) if e.kind() == ErrorKind::NotFound => {
530 return Ok(None);
531 }
532 Err(e) => {
533 return Err(e).context(OpenDalSnafu)?;
534 }
535 };
536
537 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
538
539 debug!(
540 "Load checkpoint in path: {}, metadata: {:?}",
541 last_checkpoint_path, checkpoint_metadata
542 );
543
544 self.load_checkpoint(checkpoint_metadata).await
545 }
546
547 #[cfg(test)]
548 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
549 self.object_store
550 .read(path)
551 .await
552 .context(OpenDalSnafu)
553 .map(|v| v.to_vec())
554 }
555
556 #[cfg(test)]
557 pub async fn write_last_checkpoint(
558 &mut self,
559 version: ManifestVersion,
560 bytes: &[u8],
561 ) -> Result<()> {
562 let path = self.checkpoint_file_path(version);
563 let data = self
564 .compress_type
565 .encode(bytes)
566 .await
567 .context(CompressObjectSnafu {
568 compress_type: self.compress_type,
569 path: &path,
570 })?;
571
572 let checkpoint_size = data.len();
573
574 self.object_store
575 .write(&path, data)
576 .await
577 .context(OpenDalSnafu)?;
578
579 self.set_checkpoint_file_size(version, checkpoint_size as u64);
580
581 let last_checkpoint_path = self.last_checkpoint_path();
582 let checkpoint_metadata = CheckpointMetadata {
583 size: bytes.len(),
584 version,
585 checksum: Some(1218259706),
586 extend_metadata: HashMap::new(),
587 };
588
589 debug!(
590 "Rewrite checkpoint in path: {}, metadata: {:?}",
591 last_checkpoint_path, checkpoint_metadata
592 );
593
594 let bytes = checkpoint_metadata.encode()?;
595
596 self.object_store
598 .write(&last_checkpoint_path, bytes.clone())
599 .await
600 .context(OpenDalSnafu)?;
601 Ok(())
602 }
603
604 pub(crate) fn total_manifest_size(&self) -> u64 {
606 self.manifest_size_map.read().unwrap().values().sum()
607 }
608
609 pub(crate) fn reset_manifest_size(&mut self) {
611 self.manifest_size_map.write().unwrap().clear();
612 self.total_manifest_size.store(0, Ordering::Relaxed);
613 }
614
615 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
617 let mut m = self.manifest_size_map.write().unwrap();
618 m.insert(FileKey::Delta(version), size);
619
620 self.inc_total_manifest_size(size);
621 }
622
623 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
625 let mut m = self.manifest_size_map.write().unwrap();
626 m.insert(FileKey::Checkpoint(version), size);
627
628 self.inc_total_manifest_size(size);
629 }
630
631 fn unset_file_size(&self, key: &FileKey) {
632 let mut m = self.manifest_size_map.write().unwrap();
633 if let Some(val) = m.remove(key) {
634 debug!("Unset file size: {:?}, size: {}", key, val);
635 self.dec_total_manifest_size(val);
636 }
637 }
638
639 fn inc_total_manifest_size(&self, val: u64) {
640 self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
641 }
642
643 fn dec_total_manifest_size(&self, val: u64) {
644 self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
645 }
646}
647
648#[derive(Serialize, Deserialize, Debug)]
649struct CheckpointMetadata {
650 pub size: usize,
651 pub version: ManifestVersion,
653 pub checksum: Option<u32>,
654 pub extend_metadata: HashMap<String, String>,
655}
656
657impl CheckpointMetadata {
658 fn encode(&self) -> Result<Vec<u8>> {
659 Ok(serde_json::to_string(self)
660 .context(SerdeJsonSnafu)?
661 .into_bytes())
662 }
663
664 fn decode(bs: &[u8]) -> Result<Self> {
665 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
666
667 serde_json::from_str(data).context(SerdeJsonSnafu)
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use common_test_util::temp_dir::create_temp_dir;
674 use object_store::services::Fs;
675 use object_store::ObjectStore;
676
677 use super::*;
678
679 fn new_test_manifest_store() -> ManifestObjectStore {
680 common_telemetry::init_default_ut_logging();
681 let tmp_dir = create_temp_dir("test_manifest_log_store");
682 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
683 let object_store = ObjectStore::new(builder).unwrap().finish();
684 ManifestObjectStore::new(
685 "/",
686 object_store,
687 CompressionType::Uncompressed,
688 Default::default(),
689 )
690 }
691
692 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
693 CheckpointMetadata {
694 size: 0,
695 version,
696 checksum: None,
697 extend_metadata: Default::default(),
698 }
699 }
700
701 #[test]
702 fn test_compress_file_path_generation() {
704 let path = "/foo/bar/";
705 let version: ManifestVersion = 0;
706 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
707 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
708 }
709
710 #[tokio::test]
711 async fn test_manifest_log_store_uncompress() {
712 let mut log_store = new_test_manifest_store();
713 log_store.compress_type = CompressionType::Uncompressed;
714 test_manifest_log_store_case(log_store).await;
715 }
716
717 #[tokio::test]
718 async fn test_manifest_log_store_compress() {
719 let mut log_store = new_test_manifest_store();
720 log_store.compress_type = CompressionType::Gzip;
721 test_manifest_log_store_case(log_store).await;
722 }
723
724 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
725 for v in 0..5 {
726 log_store
727 .save(v, format!("hello, {v}").as_bytes())
728 .await
729 .unwrap();
730 }
731
732 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
733 let mut it = manifests.into_iter();
734 for v in 1..4 {
735 let (version, bytes) = it.next().unwrap();
736 assert_eq!(v, version);
737 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
738 }
739 assert!(it.next().is_none());
740
741 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
742 let mut it = manifests.into_iter();
743 for v in 0..5 {
744 let (version, bytes) = it.next().unwrap();
745 assert_eq!(v, version);
746 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
747 }
748 assert!(it.next().is_none());
749
750 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
752 log_store
753 .save_checkpoint(3, "checkpoint".as_bytes())
754 .await
755 .unwrap();
756
757 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
758 assert_eq!(checkpoint, "checkpoint".as_bytes());
759 assert_eq!(3, v);
760
761 let _ = log_store.delete_until(4, true).await.unwrap();
763 let _ = log_store
764 .load_checkpoint(new_checkpoint_metadata_with_version(3))
765 .await
766 .unwrap()
767 .unwrap();
768 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
769 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
770 let mut it = manifests.into_iter();
771
772 let (version, bytes) = it.next().unwrap();
773 assert_eq!(4, version);
774 assert_eq!("hello, 4".as_bytes(), bytes);
775 assert!(it.next().is_none());
776
777 let _ = log_store.delete_until(11, false).await.unwrap();
779 assert!(log_store
780 .load_checkpoint(new_checkpoint_metadata_with_version(3))
781 .await
782 .unwrap()
783 .is_none());
784 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
785 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
786 let mut it = manifests.into_iter();
787
788 assert!(it.next().is_none());
789 }
790
791 #[tokio::test]
792 async fn test_compress_backward_compatible() {
794 let mut log_store = new_test_manifest_store();
795
796 log_store.compress_type = CompressionType::Uncompressed;
798 for v in 0..5 {
799 log_store
800 .save(v, format!("hello, {v}").as_bytes())
801 .await
802 .unwrap();
803 }
804 log_store
805 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
806 .await
807 .unwrap();
808
809 log_store.compress_type = CompressionType::Gzip;
811
812 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
814 assert_eq!(v, 5);
815 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
816
817 for v in 5..10 {
819 log_store
820 .save(v, format!("hello, {v}").as_bytes())
821 .await
822 .unwrap();
823 }
824 log_store
825 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
826 .await
827 .unwrap();
828
829 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
831 let mut it = manifests.into_iter();
832
833 for v in 0..10 {
834 let (version, bytes) = it.next().unwrap();
835 assert_eq!(v, version);
836 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
837 }
838 let (v, checkpoint) = log_store
839 .load_checkpoint(new_checkpoint_metadata_with_version(5))
840 .await
841 .unwrap()
842 .unwrap();
843 assert_eq!(v, 5);
844 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
845 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
846 assert_eq!(v, 10);
847 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
848
849 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
852 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
853 let mut it = manifests.into_iter();
854 assert!(it.next().is_none());
855 }
856
857 #[tokio::test]
858 async fn test_file_version() {
859 let version = file_version("00000000000000000007.checkpoint");
860 assert_eq!(version, 7);
861
862 let name = delta_file(version);
863 assert_eq!(name, "00000000000000000007.json");
864
865 let name = checkpoint_file(version);
866 assert_eq!(name, "00000000000000000007.checkpoint");
867 }
868
869 #[tokio::test]
870 async fn test_uncompressed_manifest_files_size() {
871 let mut log_store = new_test_manifest_store();
872 log_store.compress_type = CompressionType::Uncompressed;
874 for v in 0..5 {
875 log_store
876 .save(v, format!("hello, {v}").as_bytes())
877 .await
878 .unwrap();
879 }
880 log_store
882 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
883 .await
884 .unwrap();
885
886 assert_eq!(log_store.total_manifest_size(), 63);
888
889 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
891
892 assert_eq!(log_store.total_manifest_size(), 39);
894
895 assert_eq!(
897 log_store
898 .delete_until(ManifestVersion::MAX, false)
899 .await
900 .unwrap(),
901 3
902 );
903
904 assert_eq!(log_store.total_manifest_size(), 0);
905 }
906
907 #[tokio::test]
908 async fn test_compressed_manifest_files_size() {
909 let mut log_store = new_test_manifest_store();
910 log_store.compress_type = CompressionType::Gzip;
912 for v in 0..5 {
914 log_store
915 .save(v, format!("hello, {v}").as_bytes())
916 .await
917 .unwrap();
918 }
919 log_store
920 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
921 .await
922 .unwrap();
923
924 assert_eq!(log_store.total_manifest_size(), 181);
926
927 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
929
930 assert_eq!(log_store.total_manifest_size(), 97);
932
933 assert_eq!(
935 log_store
936 .delete_until(ManifestVersion::MAX, false)
937 .await
938 .unwrap(),
939 3
940 );
941
942 assert_eq!(log_store.total_manifest_size(), 0);
943 }
944}