1use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::{ResultExt, ensure};
31use store_api::ManifestVersion;
32use store_api::storage::RegionId;
33use tokio::sync::Semaphore;
34
35use crate::error::{
36 ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
37 OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
38};
39
40lazy_static! {
41 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
42 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
43}
44
45const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
46const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
47const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
50const FETCH_MANIFEST_PARALLELISM: usize = 16;
51
52pub const fn manifest_compress_type(compress: bool) -> CompressionType {
54 if compress {
55 DEFAULT_MANIFEST_COMPRESSION_TYPE
56 } else {
57 FALL_BACK_COMPRESS_TYPE
58 }
59}
60
61pub fn delta_file(version: ManifestVersion) -> String {
62 format!("{version:020}.json")
63}
64
65pub fn checkpoint_file(version: ManifestVersion) -> String {
66 format!("{version:020}.checkpoint")
67}
68
69pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
70 if compress_type == CompressionType::Uncompressed {
71 format!("{}{}", path, file)
72 } else {
73 format!("{}{}.{}", path, file, compress_type.file_extension())
74 }
75}
76
77fn checkpoint_checksum(data: &[u8]) -> u32 {
78 let mut hasher = Hasher::new();
79 hasher.update(data);
80 hasher.finalize()
81}
82
83fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
84 if let Some(checksum) = wanted {
85 let calculated_checksum = checkpoint_checksum(data);
86 ensure!(
87 checksum == calculated_checksum,
88 ChecksumMismatchSnafu {
89 actual: calculated_checksum,
90 expected: checksum,
91 }
92 );
93 }
94 Ok(())
95}
96
97pub fn file_version(path: &str) -> ManifestVersion {
102 let s = path.split('.').next().unwrap();
103 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
104}
105
106pub fn file_compress_type(path: &str) -> CompressionType {
111 let s = path.rsplit('.').next().unwrap_or("");
112 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
113}
114
115pub fn is_delta_file(file_name: &str) -> bool {
116 DELTA_RE.is_match(file_name)
117}
118
119pub fn is_checkpoint_file(file_name: &str) -> bool {
120 CHECKPOINT_RE.is_match(file_name)
121}
122
123#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
125enum FileKey {
126 Delta(ManifestVersion),
128 Checkpoint(ManifestVersion),
130}
131
132#[derive(Clone, Debug)]
133pub struct ManifestObjectStore {
134 object_store: ObjectStore,
135 compress_type: CompressionType,
136 path: String,
137 staging_path: String,
138 manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
140 total_manifest_size: Arc<AtomicU64>,
141}
142
143impl ManifestObjectStore {
144 pub fn new(
145 path: &str,
146 object_store: ObjectStore,
147 compress_type: CompressionType,
148 total_manifest_size: Arc<AtomicU64>,
149 ) -> Self {
150 let path = util::normalize_dir(path);
151 let staging_path = {
152 let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
154 util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
155 };
156 Self {
157 object_store,
158 compress_type,
159 path,
160 staging_path,
161 manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
162 total_manifest_size,
163 }
164 }
165
166 fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
168 let base_path = if is_staging {
169 &self.staging_path
170 } else {
171 &self.path
172 };
173 gen_path(base_path, &delta_file(version), self.compress_type)
174 }
175
176 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
178 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
179 }
180
181 pub(crate) fn last_checkpoint_path(&self) -> String {
184 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
185 }
186
187 pub(crate) fn manifest_dir(&self) -> &str {
189 &self.path
190 }
191
192 pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
194 let path = if is_staging {
195 &self.staging_path
196 } else {
197 &self.path
198 };
199 match self.object_store.lister_with(path).await {
200 Ok(streamer) => Ok(Some(streamer)),
201 Err(e) if e.kind() == ErrorKind::NotFound => {
202 debug!("Manifest directory does not exist: {}", path);
203 Ok(None)
204 }
205 Err(e) => Err(e).context(OpenDalSnafu)?,
206 }
207 }
208
209 pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
213 where
214 F: Fn(Entry) -> Option<R>,
215 {
216 let Some(streamer) = self.manifest_lister(is_staging).await? else {
217 return Ok(vec![]);
218 };
219
220 streamer
221 .try_filter_map(|e| async { Ok(filter(e)) })
222 .try_collect::<Vec<_>>()
223 .await
224 .context(OpenDalSnafu)
225 }
226
227 fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
229 entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
230 }
231
232 pub async fn scan(
234 &self,
235 start: ManifestVersion,
236 end: ManifestVersion,
237 ) -> Result<Vec<(ManifestVersion, Entry)>> {
238 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
239
240 let mut entries: Vec<(ManifestVersion, Entry)> = self
241 .get_paths(
242 |entry| {
243 let file_name = entry.name();
244 if is_delta_file(file_name) {
245 let version = file_version(file_name);
246 if start <= version && version < end {
247 return Some((version, entry));
248 }
249 }
250 None
251 },
252 false,
253 )
254 .await?;
255
256 Self::sort_manifests(&mut entries);
257
258 Ok(entries)
259 }
260
261 pub async fn fetch_manifests_strict_from(
265 &self,
266 start_version: ManifestVersion,
267 end_version: ManifestVersion,
268 region_id: RegionId,
269 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
270 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
271 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
272 debug!(
273 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
274 start_version,
275 end_version,
276 start_index,
277 region_id,
278 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
279 );
280 if let Some(start_index) = start_index {
281 Ok(manifests.split_off(start_index))
282 } else {
283 Ok(vec![])
284 }
285 }
286
287 async fn fetch_manifests_from_entries(
289 &self,
290 entries: Vec<(ManifestVersion, Entry)>,
291 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
292 if entries.is_empty() {
293 return Ok(vec![]);
294 }
295
296 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
298
299 let tasks = entries.iter().map(|(v, entry)| async {
300 let _permit = semaphore.acquire().await.unwrap();
302
303 let compress_type = file_compress_type(entry.name());
304 let bytes = self
305 .object_store
306 .read(entry.path())
307 .await
308 .context(OpenDalSnafu)?;
309 let data = compress_type
310 .decode(bytes)
311 .await
312 .context(DecompressObjectSnafu {
313 compress_type,
314 path: entry.path(),
315 })?;
316 Ok((*v, data))
317 });
318
319 try_join_all(tasks).await
320 }
321
322 pub async fn fetch_manifests(
327 &self,
328 start_version: ManifestVersion,
329 end_version: ManifestVersion,
330 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
331 let manifests = self.scan(start_version, end_version).await?;
332 self.fetch_manifests_from_entries(manifests).await
333 }
334
335 pub async fn delete_until(
340 &self,
341 end: ManifestVersion,
342 keep_last_checkpoint: bool,
343 ) -> Result<usize> {
344 let entries: Vec<_> = self
346 .get_paths(
347 |entry| {
348 let file_name = entry.name();
349 let is_checkpoint = is_checkpoint_file(file_name);
350 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
351 let version = file_version(file_name);
352 if version < end {
353 return Some((entry, is_checkpoint, version));
354 }
355 }
356 None
357 },
358 false,
359 )
360 .await?;
361 let checkpoint_version = if keep_last_checkpoint {
362 entries
364 .iter()
365 .filter_map(
366 |(_e, is_checkpoint, version)| {
367 if *is_checkpoint { Some(version) } else { None }
368 },
369 )
370 .max()
371 } else {
372 None
373 };
374 let del_entries: Vec<_> = entries
375 .iter()
376 .filter(|(_e, is_checkpoint, version)| {
377 if let Some(max_version) = checkpoint_version {
378 if *is_checkpoint {
379 version < max_version
381 } else {
382 version <= max_version
385 }
386 } else {
387 true
388 }
389 })
390 .collect();
391 let paths = del_entries
392 .iter()
393 .map(|(e, _, _)| e.path().to_string())
394 .collect::<Vec<_>>();
395 let ret = paths.len();
396
397 debug!(
398 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
399 ret, self.path, end, checkpoint_version, paths,
400 );
401
402 self.object_store
403 .delete_iter(paths)
404 .await
405 .context(OpenDalSnafu)?;
406
407 for (_, is_checkpoint, version) in &del_entries {
409 if *is_checkpoint {
410 self.unset_file_size(&FileKey::Checkpoint(*version));
411 } else {
412 self.unset_file_size(&FileKey::Delta(*version));
413 }
414 }
415
416 Ok(ret)
417 }
418
419 pub async fn save(
421 &mut self,
422 version: ManifestVersion,
423 bytes: &[u8],
424 is_staging: bool,
425 ) -> Result<()> {
426 let path = self.delta_file_path(version, is_staging);
427 debug!("Save log to manifest storage, version: {}", version);
428 let data = self
429 .compress_type
430 .encode(bytes)
431 .await
432 .context(CompressObjectSnafu {
433 compress_type: self.compress_type,
434 path: &path,
435 })?;
436 let delta_size = data.len();
437 self.object_store
438 .write(&path, data)
439 .await
440 .context(OpenDalSnafu)?;
441 self.set_delta_file_size(version, delta_size as u64);
442 Ok(())
443 }
444
445 pub(crate) async fn save_checkpoint(
447 &self,
448 version: ManifestVersion,
449 bytes: &[u8],
450 ) -> Result<()> {
451 let path = self.checkpoint_file_path(version);
452 let data = self
453 .compress_type
454 .encode(bytes)
455 .await
456 .context(CompressObjectSnafu {
457 compress_type: self.compress_type,
458 path: &path,
459 })?;
460 let checkpoint_size = data.len();
461 let checksum = checkpoint_checksum(bytes);
462 self.object_store
463 .write(&path, data)
464 .await
465 .context(OpenDalSnafu)?;
466 self.set_checkpoint_file_size(version, checkpoint_size as u64);
467
468 let last_checkpoint_path = self.last_checkpoint_path();
470
471 let checkpoint_metadata = CheckpointMetadata {
472 size: bytes.len(),
473 version,
474 checksum: Some(checksum),
475 extend_metadata: HashMap::new(),
476 };
477
478 debug!(
479 "Save checkpoint in path: {}, metadata: {:?}",
480 last_checkpoint_path, checkpoint_metadata
481 );
482
483 let bytes = checkpoint_metadata.encode()?;
484 self.object_store
485 .write(&last_checkpoint_path, bytes)
486 .await
487 .context(OpenDalSnafu)?;
488
489 Ok(())
490 }
491
492 async fn load_checkpoint(
493 &mut self,
494 metadata: CheckpointMetadata,
495 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
496 let version = metadata.version;
497 let path = self.checkpoint_file_path(version);
498 let checkpoint_data =
501 match self.object_store.read(&path).await {
502 Ok(checkpoint) => {
503 let checkpoint_size = checkpoint.len();
504 let decompress_data = self.compress_type.decode(checkpoint).await.context(
505 DecompressObjectSnafu {
506 compress_type: self.compress_type,
507 path,
508 },
509 )?;
510 verify_checksum(&decompress_data, metadata.checksum)?;
511 self.set_checkpoint_file_size(version, checkpoint_size as u64);
513 Ok(Some(decompress_data))
514 }
515 Err(e) => {
516 if e.kind() == ErrorKind::NotFound {
517 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
518 let fall_back_path = gen_path(
519 &self.path,
520 &checkpoint_file(version),
521 FALL_BACK_COMPRESS_TYPE,
522 );
523 debug!(
524 "Failed to load checkpoint from path: {}, fall back to path: {}",
525 path, fall_back_path
526 );
527 match self.object_store.read(&fall_back_path).await {
528 Ok(checkpoint) => {
529 let checkpoint_size = checkpoint.len();
530 let decompress_data = FALL_BACK_COMPRESS_TYPE
531 .decode(checkpoint)
532 .await
533 .context(DecompressObjectSnafu {
534 compress_type: FALL_BACK_COMPRESS_TYPE,
535 path,
536 })?;
537 verify_checksum(&decompress_data, metadata.checksum)?;
538 self.set_checkpoint_file_size(version, checkpoint_size as u64);
539 Ok(Some(decompress_data))
540 }
541 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
542 Err(e) => Err(e).context(OpenDalSnafu),
543 }
544 } else {
545 Ok(None)
546 }
547 } else {
548 Err(e).context(OpenDalSnafu)
549 }
550 }
551 }?;
552 Ok(checkpoint_data.map(|data| (version, data)))
553 }
554
555 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
558 let last_checkpoint_path = self.last_checkpoint_path();
559 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
560 Ok(data) => data,
561 Err(e) if e.kind() == ErrorKind::NotFound => {
562 return Ok(None);
563 }
564 Err(e) => {
565 return Err(e).context(OpenDalSnafu)?;
566 }
567 };
568
569 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
570
571 debug!(
572 "Load checkpoint in path: {}, metadata: {:?}",
573 last_checkpoint_path, checkpoint_metadata
574 );
575
576 self.load_checkpoint(checkpoint_metadata).await
577 }
578
579 #[cfg(test)]
580 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
581 self.object_store
582 .read(path)
583 .await
584 .context(OpenDalSnafu)
585 .map(|v| v.to_vec())
586 }
587
588 #[cfg(test)]
589 pub async fn write_last_checkpoint(
590 &mut self,
591 version: ManifestVersion,
592 bytes: &[u8],
593 ) -> Result<()> {
594 let path = self.checkpoint_file_path(version);
595 let data = self
596 .compress_type
597 .encode(bytes)
598 .await
599 .context(CompressObjectSnafu {
600 compress_type: self.compress_type,
601 path: &path,
602 })?;
603
604 let checkpoint_size = data.len();
605
606 self.object_store
607 .write(&path, data)
608 .await
609 .context(OpenDalSnafu)?;
610
611 self.set_checkpoint_file_size(version, checkpoint_size as u64);
612
613 let last_checkpoint_path = self.last_checkpoint_path();
614 let checkpoint_metadata = CheckpointMetadata {
615 size: bytes.len(),
616 version,
617 checksum: Some(1218259706),
618 extend_metadata: HashMap::new(),
619 };
620
621 debug!(
622 "Rewrite checkpoint in path: {}, metadata: {:?}",
623 last_checkpoint_path, checkpoint_metadata
624 );
625
626 let bytes = checkpoint_metadata.encode()?;
627
628 self.object_store
630 .write(&last_checkpoint_path, bytes.clone())
631 .await
632 .context(OpenDalSnafu)?;
633 Ok(())
634 }
635
636 pub(crate) fn total_manifest_size(&self) -> u64 {
638 self.manifest_size_map.read().unwrap().values().sum()
639 }
640
641 pub(crate) fn reset_manifest_size(&mut self) {
643 self.manifest_size_map.write().unwrap().clear();
644 self.total_manifest_size.store(0, Ordering::Relaxed);
645 }
646
647 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
649 let mut m = self.manifest_size_map.write().unwrap();
650 m.insert(FileKey::Delta(version), size);
651
652 self.inc_total_manifest_size(size);
653 }
654
655 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
657 let mut m = self.manifest_size_map.write().unwrap();
658 m.insert(FileKey::Checkpoint(version), size);
659
660 self.inc_total_manifest_size(size);
661 }
662
663 fn unset_file_size(&self, key: &FileKey) {
664 let mut m = self.manifest_size_map.write().unwrap();
665 if let Some(val) = m.remove(key) {
666 debug!("Unset file size: {:?}, size: {}", key, val);
667 self.dec_total_manifest_size(val);
668 }
669 }
670
671 fn inc_total_manifest_size(&self, val: u64) {
672 self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
673 }
674
675 fn dec_total_manifest_size(&self, val: u64) {
676 self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
677 }
678
679 pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
681 let manifest_entries = self
682 .get_paths(
683 |entry| {
684 let file_name = entry.name();
685 if is_delta_file(file_name) {
686 let version = file_version(file_name);
687 Some((version, entry))
688 } else {
689 None
690 }
691 },
692 true,
693 )
694 .await?;
695
696 let mut sorted_entries = manifest_entries;
697 Self::sort_manifests(&mut sorted_entries);
698
699 self.fetch_manifests_from_entries(sorted_entries).await
700 }
701
702 pub async fn clear_staging_manifests(&mut self) -> Result<()> {
704 self.object_store
705 .remove_all(&self.staging_path)
706 .await
707 .context(OpenDalSnafu)?;
708
709 debug!(
710 "Cleared all staging manifest files from {}",
711 self.staging_path
712 );
713
714 Ok(())
715 }
716}
717
718#[derive(Serialize, Deserialize, Debug)]
719pub(crate) struct CheckpointMetadata {
720 pub size: usize,
721 pub version: ManifestVersion,
723 pub checksum: Option<u32>,
724 pub extend_metadata: HashMap<String, String>,
725}
726
727impl CheckpointMetadata {
728 fn encode(&self) -> Result<Vec<u8>> {
729 Ok(serde_json::to_string(self)
730 .context(SerdeJsonSnafu)?
731 .into_bytes())
732 }
733
734 fn decode(bs: &[u8]) -> Result<Self> {
735 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
736
737 serde_json::from_str(data).context(SerdeJsonSnafu)
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use common_test_util::temp_dir::create_temp_dir;
744 use object_store::ObjectStore;
745 use object_store::services::Fs;
746
747 use super::*;
748
749 fn new_test_manifest_store() -> ManifestObjectStore {
750 common_telemetry::init_default_ut_logging();
751 let tmp_dir = create_temp_dir("test_manifest_log_store");
752 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
753 let object_store = ObjectStore::new(builder).unwrap().finish();
754 ManifestObjectStore::new(
755 "/",
756 object_store,
757 CompressionType::Uncompressed,
758 Default::default(),
759 )
760 }
761
762 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
763 CheckpointMetadata {
764 size: 0,
765 version,
766 checksum: None,
767 extend_metadata: Default::default(),
768 }
769 }
770
771 #[test]
772 fn test_compress_file_path_generation() {
774 let path = "/foo/bar/";
775 let version: ManifestVersion = 0;
776 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
777 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
778 }
779
780 #[tokio::test]
781 async fn test_manifest_log_store_uncompress() {
782 let mut log_store = new_test_manifest_store();
783 log_store.compress_type = CompressionType::Uncompressed;
784 test_manifest_log_store_case(log_store).await;
785 }
786
787 #[tokio::test]
788 async fn test_manifest_log_store_compress() {
789 let mut log_store = new_test_manifest_store();
790 log_store.compress_type = CompressionType::Gzip;
791 test_manifest_log_store_case(log_store).await;
792 }
793
794 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
795 for v in 0..5 {
796 log_store
797 .save(v, format!("hello, {v}").as_bytes(), false)
798 .await
799 .unwrap();
800 }
801
802 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
803 let mut it = manifests.into_iter();
804 for v in 1..4 {
805 let (version, bytes) = it.next().unwrap();
806 assert_eq!(v, version);
807 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
808 }
809 assert!(it.next().is_none());
810
811 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
812 let mut it = manifests.into_iter();
813 for v in 0..5 {
814 let (version, bytes) = it.next().unwrap();
815 assert_eq!(v, version);
816 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
817 }
818 assert!(it.next().is_none());
819
820 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
822 log_store
823 .save_checkpoint(3, "checkpoint".as_bytes())
824 .await
825 .unwrap();
826
827 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
828 assert_eq!(checkpoint, "checkpoint".as_bytes());
829 assert_eq!(3, v);
830
831 let _ = log_store.delete_until(4, true).await.unwrap();
833 let _ = log_store
834 .load_checkpoint(new_checkpoint_metadata_with_version(3))
835 .await
836 .unwrap()
837 .unwrap();
838 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
839 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
840 let mut it = manifests.into_iter();
841
842 let (version, bytes) = it.next().unwrap();
843 assert_eq!(4, version);
844 assert_eq!("hello, 4".as_bytes(), bytes);
845 assert!(it.next().is_none());
846
847 let _ = log_store.delete_until(11, false).await.unwrap();
849 assert!(
850 log_store
851 .load_checkpoint(new_checkpoint_metadata_with_version(3))
852 .await
853 .unwrap()
854 .is_none()
855 );
856 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
857 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
858 let mut it = manifests.into_iter();
859
860 assert!(it.next().is_none());
861 }
862
863 #[tokio::test]
864 async fn test_compress_backward_compatible() {
866 let mut log_store = new_test_manifest_store();
867
868 log_store.compress_type = CompressionType::Uncompressed;
870 for v in 0..5 {
871 log_store
872 .save(v, format!("hello, {v}").as_bytes(), false)
873 .await
874 .unwrap();
875 }
876 log_store
877 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
878 .await
879 .unwrap();
880
881 log_store.compress_type = CompressionType::Gzip;
883
884 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
886 assert_eq!(v, 5);
887 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
888
889 for v in 5..10 {
891 log_store
892 .save(v, format!("hello, {v}").as_bytes(), false)
893 .await
894 .unwrap();
895 }
896 log_store
897 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
898 .await
899 .unwrap();
900
901 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
903 let mut it = manifests.into_iter();
904
905 for v in 0..10 {
906 let (version, bytes) = it.next().unwrap();
907 assert_eq!(v, version);
908 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
909 }
910 let (v, checkpoint) = log_store
911 .load_checkpoint(new_checkpoint_metadata_with_version(5))
912 .await
913 .unwrap()
914 .unwrap();
915 assert_eq!(v, 5);
916 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
917 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
918 assert_eq!(v, 10);
919 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
920
921 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
924 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
925 let mut it = manifests.into_iter();
926 assert!(it.next().is_none());
927 }
928
929 #[tokio::test]
930 async fn test_file_version() {
931 let version = file_version("00000000000000000007.checkpoint");
932 assert_eq!(version, 7);
933
934 let name = delta_file(version);
935 assert_eq!(name, "00000000000000000007.json");
936
937 let name = checkpoint_file(version);
938 assert_eq!(name, "00000000000000000007.checkpoint");
939 }
940
941 #[tokio::test]
942 async fn test_uncompressed_manifest_files_size() {
943 let mut log_store = new_test_manifest_store();
944 log_store.compress_type = CompressionType::Uncompressed;
946 for v in 0..5 {
947 log_store
948 .save(v, format!("hello, {v}").as_bytes(), false)
949 .await
950 .unwrap();
951 }
952 log_store
954 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
955 .await
956 .unwrap();
957
958 assert_eq!(log_store.total_manifest_size(), 63);
960
961 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
963
964 assert_eq!(log_store.total_manifest_size(), 39);
966
967 assert_eq!(
969 log_store
970 .delete_until(ManifestVersion::MAX, false)
971 .await
972 .unwrap(),
973 3
974 );
975
976 assert_eq!(log_store.total_manifest_size(), 0);
977 }
978
979 #[tokio::test]
980 async fn test_compressed_manifest_files_size() {
981 let mut log_store = new_test_manifest_store();
982 log_store.compress_type = CompressionType::Gzip;
984 for v in 0..5 {
986 log_store
987 .save(v, format!("hello, {v}").as_bytes(), false)
988 .await
989 .unwrap();
990 }
991 log_store
992 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
993 .await
994 .unwrap();
995
996 assert_eq!(log_store.total_manifest_size(), 181);
998
999 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1001
1002 assert_eq!(log_store.total_manifest_size(), 97);
1004
1005 assert_eq!(
1007 log_store
1008 .delete_until(ManifestVersion::MAX, false)
1009 .await
1010 .unwrap(),
1011 3
1012 );
1013
1014 assert_eq!(log_store.total_manifest_size(), 0);
1015 }
1016}