1use std::collections::HashMap;
16use std::iter::Iterator;
17use std::str::FromStr;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, RwLock};
20
21use common_datasource::compression::CompressionType;
22use common_telemetry::debug;
23use crc32fast::Hasher;
24use futures::TryStreamExt;
25use futures::future::try_join_all;
26use lazy_static::lazy_static;
27use object_store::util::join_dir;
28use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
29use regex::Regex;
30use serde::{Deserialize, Serialize};
31use snafu::{ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::storage::RegionId;
34use tokio::sync::Semaphore;
35
36use crate::cache::manifest_cache::ManifestCache;
37use crate::error::{
38 ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
39 OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
40};
41
42lazy_static! {
43 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
44 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
45}
46
47const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
48const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
49const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
52const FETCH_MANIFEST_PARALLELISM: usize = 16;
53
54pub fn manifest_dir(region_dir: &str) -> String {
56 join_dir(region_dir, "manifest")
57}
58
59pub const fn manifest_compress_type(compress: bool) -> CompressionType {
61 if compress {
62 DEFAULT_MANIFEST_COMPRESSION_TYPE
63 } else {
64 FALL_BACK_COMPRESS_TYPE
65 }
66}
67
68pub fn delta_file(version: ManifestVersion) -> String {
69 format!("{version:020}.json")
70}
71
72pub fn checkpoint_file(version: ManifestVersion) -> String {
73 format!("{version:020}.checkpoint")
74}
75
76pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
77 if compress_type == CompressionType::Uncompressed {
78 format!("{}{}", path, file)
79 } else {
80 format!("{}{}.{}", path, file, compress_type.file_extension())
81 }
82}
83
84fn checkpoint_checksum(data: &[u8]) -> u32 {
85 let mut hasher = Hasher::new();
86 hasher.update(data);
87 hasher.finalize()
88}
89
90fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
91 if let Some(checksum) = wanted {
92 let calculated_checksum = checkpoint_checksum(data);
93 ensure!(
94 checksum == calculated_checksum,
95 ChecksumMismatchSnafu {
96 actual: calculated_checksum,
97 expected: checksum,
98 }
99 );
100 }
101 Ok(())
102}
103
104pub fn file_version(path: &str) -> ManifestVersion {
109 let s = path.split('.').next().unwrap();
110 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
111}
112
113pub fn file_compress_type(path: &str) -> CompressionType {
118 let s = path.rsplit('.').next().unwrap_or("");
119 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
120}
121
122pub fn is_delta_file(file_name: &str) -> bool {
123 DELTA_RE.is_match(file_name)
124}
125
126pub fn is_checkpoint_file(file_name: &str) -> bool {
127 CHECKPOINT_RE.is_match(file_name)
128}
129
130#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
132enum FileKey {
133 Delta(ManifestVersion),
135 Checkpoint(ManifestVersion),
137}
138
139#[derive(Clone, Debug)]
140pub struct ManifestObjectStore {
141 object_store: ObjectStore,
142 compress_type: CompressionType,
143 path: String,
144 staging_path: String,
145 manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
147 total_manifest_size: Arc<AtomicU64>,
148 manifest_cache: Option<ManifestCache>,
150}
151
152impl ManifestObjectStore {
153 pub fn new(
154 path: &str,
155 object_store: ObjectStore,
156 compress_type: CompressionType,
157 total_manifest_size: Arc<AtomicU64>,
158 manifest_cache: Option<ManifestCache>,
159 ) -> Self {
160 common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
161
162 let path = util::normalize_dir(path);
163 let staging_path = {
164 let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
166 util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
167 };
168 Self {
169 object_store,
170 compress_type,
171 path,
172 staging_path,
173 manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
174 total_manifest_size,
175 manifest_cache,
176 }
177 }
178
179 fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
181 let base_path = if is_staging {
182 &self.staging_path
183 } else {
184 &self.path
185 };
186 gen_path(base_path, &delta_file(version), self.compress_type)
187 }
188
189 fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
191 gen_path(&self.path, &checkpoint_file(version), self.compress_type)
192 }
193
194 pub(crate) fn last_checkpoint_path(&self) -> String {
197 format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
198 }
199
200 pub(crate) fn manifest_dir(&self) -> &str {
202 &self.path
203 }
204
205 pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
207 let path = if is_staging {
208 &self.staging_path
209 } else {
210 &self.path
211 };
212 match self.object_store.lister_with(path).await {
213 Ok(streamer) => Ok(Some(streamer)),
214 Err(e) if e.kind() == ErrorKind::NotFound => {
215 debug!("Manifest directory does not exist: {}", path);
216 Ok(None)
217 }
218 Err(e) => Err(e).context(OpenDalSnafu)?,
219 }
220 }
221
222 pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
226 where
227 F: Fn(Entry) -> Option<R>,
228 {
229 let Some(streamer) = self.manifest_lister(is_staging).await? else {
230 return Ok(vec![]);
231 };
232
233 streamer
234 .try_filter_map(|e| async { Ok(filter(e)) })
235 .try_collect::<Vec<_>>()
236 .await
237 .context(OpenDalSnafu)
238 }
239
240 fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
242 entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
243 }
244
245 pub async fn scan(
247 &self,
248 start: ManifestVersion,
249 end: ManifestVersion,
250 ) -> Result<Vec<(ManifestVersion, Entry)>> {
251 ensure!(start <= end, InvalidScanIndexSnafu { start, end });
252
253 let mut entries: Vec<(ManifestVersion, Entry)> = self
254 .get_paths(
255 |entry| {
256 let file_name = entry.name();
257 if is_delta_file(file_name) {
258 let version = file_version(file_name);
259 if start <= version && version < end {
260 return Some((version, entry));
261 }
262 }
263 None
264 },
265 false,
266 )
267 .await?;
268
269 Self::sort_manifests(&mut entries);
270
271 Ok(entries)
272 }
273
274 pub async fn fetch_manifests_strict_from(
278 &self,
279 start_version: ManifestVersion,
280 end_version: ManifestVersion,
281 region_id: RegionId,
282 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
283 let mut manifests = self.fetch_manifests(start_version, end_version).await?;
284 let start_index = manifests.iter().position(|(v, _)| *v == start_version);
285 debug!(
286 "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
287 start_version,
288 end_version,
289 start_index,
290 region_id,
291 manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
292 );
293 if let Some(start_index) = start_index {
294 Ok(manifests.split_off(start_index))
295 } else {
296 Ok(vec![])
297 }
298 }
299
300 async fn fetch_manifests_from_entries(
303 &self,
304 entries: Vec<(ManifestVersion, Entry)>,
305 is_staging: bool,
306 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
307 if entries.is_empty() {
308 return Ok(vec![]);
309 }
310
311 let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
313
314 let tasks = entries.iter().map(|(v, entry)| async {
315 let _permit = semaphore.acquire().await.unwrap();
317
318 let cache_key = entry.path();
319 if let Some(data) = self.get_from_cache(cache_key, is_staging).await {
321 return Ok((*v, data));
322 }
323
324 let compress_type = file_compress_type(entry.name());
326 let bytes = self
327 .object_store
328 .read(entry.path())
329 .await
330 .context(OpenDalSnafu)?;
331 let data = compress_type
332 .decode(bytes)
333 .await
334 .context(DecompressObjectSnafu {
335 compress_type,
336 path: entry.path(),
337 })?;
338
339 self.put_to_cache(cache_key.to_string(), &data, is_staging)
341 .await;
342
343 Ok((*v, data))
344 });
345
346 try_join_all(tasks).await
347 }
348
349 pub async fn fetch_manifests(
354 &self,
355 start_version: ManifestVersion,
356 end_version: ManifestVersion,
357 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
358 let manifests = self.scan(start_version, end_version).await?;
359 self.fetch_manifests_from_entries(manifests, false).await
360 }
361
362 pub async fn delete_until(
367 &self,
368 end: ManifestVersion,
369 keep_last_checkpoint: bool,
370 ) -> Result<usize> {
371 let entries: Vec<_> = self
373 .get_paths(
374 |entry| {
375 let file_name = entry.name();
376 let is_checkpoint = is_checkpoint_file(file_name);
377 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
378 let version = file_version(file_name);
379 if version < end {
380 return Some((entry, is_checkpoint, version));
381 }
382 }
383 None
384 },
385 false,
386 )
387 .await?;
388 let checkpoint_version = if keep_last_checkpoint {
389 entries
391 .iter()
392 .filter_map(
393 |(_e, is_checkpoint, version)| {
394 if *is_checkpoint { Some(version) } else { None }
395 },
396 )
397 .max()
398 } else {
399 None
400 };
401 let del_entries: Vec<_> = entries
402 .iter()
403 .filter(|(_e, is_checkpoint, version)| {
404 if let Some(max_version) = checkpoint_version {
405 if *is_checkpoint {
406 version < max_version
408 } else {
409 version <= max_version
412 }
413 } else {
414 true
415 }
416 })
417 .collect();
418 let paths = del_entries
419 .iter()
420 .map(|(e, _, _)| e.path().to_string())
421 .collect::<Vec<_>>();
422 let ret = paths.len();
423
424 debug!(
425 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
426 ret, self.path, end, checkpoint_version, paths,
427 );
428
429 for (entry, _, _) in &del_entries {
431 self.remove_from_cache(entry.path()).await;
432 }
433
434 self.object_store
435 .delete_iter(paths)
436 .await
437 .context(OpenDalSnafu)?;
438
439 for (_, is_checkpoint, version) in &del_entries {
441 if *is_checkpoint {
442 self.unset_file_size(&FileKey::Checkpoint(*version));
443 } else {
444 self.unset_file_size(&FileKey::Delta(*version));
445 }
446 }
447
448 Ok(ret)
449 }
450
451 pub async fn save(
453 &mut self,
454 version: ManifestVersion,
455 bytes: &[u8],
456 is_staging: bool,
457 ) -> Result<()> {
458 let path = self.delta_file_path(version, is_staging);
459 debug!("Save log to manifest storage, version: {}", version);
460 let data = self
461 .compress_type
462 .encode(bytes)
463 .await
464 .context(CompressObjectSnafu {
465 compress_type: self.compress_type,
466 path: &path,
467 })?;
468 let delta_size = data.len();
469
470 self.write_and_put_cache(&path, data, is_staging).await?;
471 self.set_delta_file_size(version, delta_size as u64);
472
473 Ok(())
474 }
475
476 pub(crate) async fn save_checkpoint(
478 &self,
479 version: ManifestVersion,
480 bytes: &[u8],
481 ) -> Result<()> {
482 let path = self.checkpoint_file_path(version);
483 let data = self
484 .compress_type
485 .encode(bytes)
486 .await
487 .context(CompressObjectSnafu {
488 compress_type: self.compress_type,
489 path: &path,
490 })?;
491 let checkpoint_size = data.len();
492 let checksum = checkpoint_checksum(bytes);
493
494 self.write_and_put_cache(&path, data, false).await?;
495 self.set_checkpoint_file_size(version, checkpoint_size as u64);
496
497 let last_checkpoint_path = self.last_checkpoint_path();
499
500 let checkpoint_metadata = CheckpointMetadata {
501 size: bytes.len(),
502 version,
503 checksum: Some(checksum),
504 extend_metadata: HashMap::new(),
505 };
506
507 debug!(
508 "Save checkpoint in path: {}, metadata: {:?}",
509 last_checkpoint_path, checkpoint_metadata
510 );
511
512 let bytes = checkpoint_metadata.encode()?;
513 self.object_store
514 .write(&last_checkpoint_path, bytes)
515 .await
516 .context(OpenDalSnafu)?;
517
518 Ok(())
519 }
520
521 async fn load_checkpoint(
522 &mut self,
523 metadata: CheckpointMetadata,
524 ) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
525 let version = metadata.version;
526 let path = self.checkpoint_file_path(version);
527
528 if let Some(data) = self.get_from_cache(&path, false).await {
530 verify_checksum(&data, metadata.checksum)?;
531 return Ok(Some((version, data)));
532 }
533
534 let checkpoint_data = match self.object_store.read(&path).await {
537 Ok(checkpoint) => {
538 let checkpoint_size = checkpoint.len();
539 let decompress_data =
540 self.compress_type
541 .decode(checkpoint)
542 .await
543 .with_context(|_| DecompressObjectSnafu {
544 compress_type: self.compress_type,
545 path: path.clone(),
546 })?;
547 verify_checksum(&decompress_data, metadata.checksum)?;
548 self.set_checkpoint_file_size(version, checkpoint_size as u64);
550 self.put_to_cache(path, &decompress_data, false).await;
552 Ok(Some(decompress_data))
553 }
554 Err(e) => {
555 if e.kind() == ErrorKind::NotFound {
556 if self.compress_type != FALL_BACK_COMPRESS_TYPE {
557 let fall_back_path = gen_path(
558 &self.path,
559 &checkpoint_file(version),
560 FALL_BACK_COMPRESS_TYPE,
561 );
562 debug!(
563 "Failed to load checkpoint from path: {}, fall back to path: {}",
564 path, fall_back_path
565 );
566
567 if let Some(data) = self.get_from_cache(&fall_back_path, false).await {
569 verify_checksum(&data, metadata.checksum)?;
570 return Ok(Some((version, data)));
571 }
572
573 match self.object_store.read(&fall_back_path).await {
574 Ok(checkpoint) => {
575 let checkpoint_size = checkpoint.len();
576 let decompress_data = FALL_BACK_COMPRESS_TYPE
577 .decode(checkpoint)
578 .await
579 .with_context(|_| DecompressObjectSnafu {
580 compress_type: FALL_BACK_COMPRESS_TYPE,
581 path: fall_back_path.clone(),
582 })?;
583 verify_checksum(&decompress_data, metadata.checksum)?;
584 self.set_checkpoint_file_size(version, checkpoint_size as u64);
585 self.put_to_cache(fall_back_path, &decompress_data, false)
587 .await;
588 Ok(Some(decompress_data))
589 }
590 Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
591 Err(e) => Err(e).context(OpenDalSnafu),
592 }
593 } else {
594 Ok(None)
595 }
596 } else {
597 Err(e).context(OpenDalSnafu)
598 }
599 }
600 }?;
601 Ok(checkpoint_data.map(|data| (version, data)))
602 }
603
604 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
607 let last_checkpoint_path = self.last_checkpoint_path();
608
609 let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
611 Ok(data) => data.to_vec(),
612 Err(e) if e.kind() == ErrorKind::NotFound => {
613 return Ok(None);
614 }
615 Err(e) => {
616 return Err(e).context(OpenDalSnafu)?;
617 }
618 };
619
620 let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
621
622 debug!(
623 "Load checkpoint in path: {}, metadata: {:?}",
624 last_checkpoint_path, checkpoint_metadata
625 );
626
627 self.load_checkpoint(checkpoint_metadata).await
628 }
629
630 #[cfg(test)]
631 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
632 self.object_store
633 .read(path)
634 .await
635 .context(OpenDalSnafu)
636 .map(|v| v.to_vec())
637 }
638
639 #[cfg(test)]
640 pub async fn write_last_checkpoint(
641 &mut self,
642 version: ManifestVersion,
643 bytes: &[u8],
644 ) -> Result<()> {
645 let path = self.checkpoint_file_path(version);
646 let data = self
647 .compress_type
648 .encode(bytes)
649 .await
650 .context(CompressObjectSnafu {
651 compress_type: self.compress_type,
652 path: &path,
653 })?;
654
655 let checkpoint_size = data.len();
656
657 self.object_store
658 .write(&path, data)
659 .await
660 .context(OpenDalSnafu)?;
661
662 self.set_checkpoint_file_size(version, checkpoint_size as u64);
663
664 let last_checkpoint_path = self.last_checkpoint_path();
665 let checkpoint_metadata = CheckpointMetadata {
666 size: bytes.len(),
667 version,
668 checksum: Some(1218259706),
669 extend_metadata: HashMap::new(),
670 };
671
672 debug!(
673 "Rewrite checkpoint in path: {}, metadata: {:?}",
674 last_checkpoint_path, checkpoint_metadata
675 );
676
677 let bytes = checkpoint_metadata.encode()?;
678
679 self.object_store
681 .write(&last_checkpoint_path, bytes.clone())
682 .await
683 .context(OpenDalSnafu)?;
684 Ok(())
685 }
686
687 pub(crate) fn total_manifest_size(&self) -> u64 {
689 self.manifest_size_map.read().unwrap().values().sum()
690 }
691
692 pub(crate) fn reset_manifest_size(&mut self) {
694 self.manifest_size_map.write().unwrap().clear();
695 self.total_manifest_size.store(0, Ordering::Relaxed);
696 }
697
698 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
700 let mut m = self.manifest_size_map.write().unwrap();
701 m.insert(FileKey::Delta(version), size);
702
703 self.inc_total_manifest_size(size);
704 }
705
706 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
708 let mut m = self.manifest_size_map.write().unwrap();
709 m.insert(FileKey::Checkpoint(version), size);
710
711 self.inc_total_manifest_size(size);
712 }
713
714 fn unset_file_size(&self, key: &FileKey) {
715 let mut m = self.manifest_size_map.write().unwrap();
716 if let Some(val) = m.remove(key) {
717 debug!("Unset file size: {:?}, size: {}", key, val);
718 self.dec_total_manifest_size(val);
719 }
720 }
721
722 fn inc_total_manifest_size(&self, val: u64) {
723 self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
724 }
725
726 fn dec_total_manifest_size(&self, val: u64) {
727 self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
728 }
729
730 pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
732 let manifest_entries = self
733 .get_paths(
734 |entry| {
735 let file_name = entry.name();
736 if is_delta_file(file_name) {
737 let version = file_version(file_name);
738 Some((version, entry))
739 } else {
740 None
741 }
742 },
743 true,
744 )
745 .await?;
746
747 let mut sorted_entries = manifest_entries;
748 Self::sort_manifests(&mut sorted_entries);
749
750 self.fetch_manifests_from_entries(sorted_entries, true)
751 .await
752 }
753
754 pub async fn clear_staging_manifests(&mut self) -> Result<()> {
756 self.object_store
757 .remove_all(&self.staging_path)
758 .await
759 .context(OpenDalSnafu)?;
760
761 debug!(
762 "Cleared all staging manifest files from {}",
763 self.staging_path
764 );
765
766 Ok(())
767 }
768
769 async fn get_from_cache(&self, key: &str, is_staging: bool) -> Option<Vec<u8>> {
773 if is_staging {
774 return None;
775 }
776 let cache = self.manifest_cache.as_ref()?;
777 cache.get_file(key).await
778 }
779
780 async fn put_to_cache(&self, key: String, data: &[u8], is_staging: bool) {
783 if is_staging {
784 return;
785 }
786 let Some(cache) = &self.manifest_cache else {
787 return;
788 };
789
790 cache.put_file(key, data.to_vec()).await;
791 }
792
793 async fn write_and_put_cache(&self, path: &str, data: Vec<u8>, is_staging: bool) -> Result<()> {
796 let cache_data = if !is_staging && self.manifest_cache.is_some() {
798 Some(data.clone())
799 } else {
800 None
801 };
802
803 self.object_store
805 .write(path, data)
806 .await
807 .context(OpenDalSnafu)?;
808
809 if let Some(data) = cache_data {
811 self.put_to_cache(path.to_string(), &data, is_staging).await;
812 }
813
814 Ok(())
815 }
816
817 async fn remove_from_cache(&self, key: &str) {
819 let Some(cache) = &self.manifest_cache else {
820 return;
821 };
822
823 cache.remove(key).await;
824 }
825}
826
827#[derive(Serialize, Deserialize, Debug)]
828pub(crate) struct CheckpointMetadata {
829 pub size: usize,
830 pub version: ManifestVersion,
832 pub checksum: Option<u32>,
833 pub extend_metadata: HashMap<String, String>,
834}
835
836impl CheckpointMetadata {
837 fn encode(&self) -> Result<Vec<u8>> {
838 Ok(serde_json::to_string(self)
839 .context(SerdeJsonSnafu)?
840 .into_bytes())
841 }
842
843 fn decode(bs: &[u8]) -> Result<Self> {
844 let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
845
846 serde_json::from_str(data).context(SerdeJsonSnafu)
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use common_test_util::temp_dir::create_temp_dir;
853 use object_store::ObjectStore;
854 use object_store::services::Fs;
855
856 use super::*;
857
858 fn new_test_manifest_store() -> ManifestObjectStore {
859 common_telemetry::init_default_ut_logging();
860 let tmp_dir = create_temp_dir("test_manifest_log_store");
861 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
862 let object_store = ObjectStore::new(builder).unwrap().finish();
863 ManifestObjectStore::new(
864 "/",
865 object_store,
866 CompressionType::Uncompressed,
867 Default::default(),
868 None,
869 )
870 }
871
872 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
873 CheckpointMetadata {
874 size: 0,
875 version,
876 checksum: None,
877 extend_metadata: Default::default(),
878 }
879 }
880
881 #[test]
882 fn test_compress_file_path_generation() {
884 let path = "/foo/bar/";
885 let version: ManifestVersion = 0;
886 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
887 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
888 }
889
890 #[tokio::test]
891 async fn test_manifest_log_store_uncompress() {
892 let mut log_store = new_test_manifest_store();
893 log_store.compress_type = CompressionType::Uncompressed;
894 test_manifest_log_store_case(log_store).await;
895 }
896
897 #[tokio::test]
898 async fn test_manifest_log_store_compress() {
899 let mut log_store = new_test_manifest_store();
900 log_store.compress_type = CompressionType::Gzip;
901 test_manifest_log_store_case(log_store).await;
902 }
903
904 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
905 for v in 0..5 {
906 log_store
907 .save(v, format!("hello, {v}").as_bytes(), false)
908 .await
909 .unwrap();
910 }
911
912 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
913 let mut it = manifests.into_iter();
914 for v in 1..4 {
915 let (version, bytes) = it.next().unwrap();
916 assert_eq!(v, version);
917 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
918 }
919 assert!(it.next().is_none());
920
921 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
922 let mut it = manifests.into_iter();
923 for v in 0..5 {
924 let (version, bytes) = it.next().unwrap();
925 assert_eq!(v, version);
926 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
927 }
928 assert!(it.next().is_none());
929
930 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
932 log_store
933 .save_checkpoint(3, "checkpoint".as_bytes())
934 .await
935 .unwrap();
936
937 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
938 assert_eq!(checkpoint, "checkpoint".as_bytes());
939 assert_eq!(3, v);
940
941 let _ = log_store.delete_until(4, true).await.unwrap();
943 let _ = log_store
944 .load_checkpoint(new_checkpoint_metadata_with_version(3))
945 .await
946 .unwrap()
947 .unwrap();
948 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
949 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
950 let mut it = manifests.into_iter();
951
952 let (version, bytes) = it.next().unwrap();
953 assert_eq!(4, version);
954 assert_eq!("hello, 4".as_bytes(), bytes);
955 assert!(it.next().is_none());
956
957 let _ = log_store.delete_until(11, false).await.unwrap();
959 assert!(
960 log_store
961 .load_checkpoint(new_checkpoint_metadata_with_version(3))
962 .await
963 .unwrap()
964 .is_none()
965 );
966 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
967 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
968 let mut it = manifests.into_iter();
969
970 assert!(it.next().is_none());
971 }
972
973 #[tokio::test]
974 async fn test_compress_backward_compatible() {
976 let mut log_store = new_test_manifest_store();
977
978 log_store.compress_type = CompressionType::Uncompressed;
980 for v in 0..5 {
981 log_store
982 .save(v, format!("hello, {v}").as_bytes(), false)
983 .await
984 .unwrap();
985 }
986 log_store
987 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
988 .await
989 .unwrap();
990
991 log_store.compress_type = CompressionType::Gzip;
993
994 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
996 assert_eq!(v, 5);
997 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
998
999 for v in 5..10 {
1001 log_store
1002 .save(v, format!("hello, {v}").as_bytes(), false)
1003 .await
1004 .unwrap();
1005 }
1006 log_store
1007 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
1008 .await
1009 .unwrap();
1010
1011 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1013 let mut it = manifests.into_iter();
1014
1015 for v in 0..10 {
1016 let (version, bytes) = it.next().unwrap();
1017 assert_eq!(v, version);
1018 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
1019 }
1020 let (v, checkpoint) = log_store
1021 .load_checkpoint(new_checkpoint_metadata_with_version(5))
1022 .await
1023 .unwrap()
1024 .unwrap();
1025 assert_eq!(v, 5);
1026 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
1027 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
1028 assert_eq!(v, 10);
1029 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
1030
1031 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
1034 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
1035 let mut it = manifests.into_iter();
1036 assert!(it.next().is_none());
1037 }
1038
1039 #[tokio::test]
1040 async fn test_file_version() {
1041 let version = file_version("00000000000000000007.checkpoint");
1042 assert_eq!(version, 7);
1043
1044 let name = delta_file(version);
1045 assert_eq!(name, "00000000000000000007.json");
1046
1047 let name = checkpoint_file(version);
1048 assert_eq!(name, "00000000000000000007.checkpoint");
1049 }
1050
1051 #[tokio::test]
1052 async fn test_uncompressed_manifest_files_size() {
1053 let mut log_store = new_test_manifest_store();
1054 log_store.compress_type = CompressionType::Uncompressed;
1056 for v in 0..5 {
1057 log_store
1058 .save(v, format!("hello, {v}").as_bytes(), false)
1059 .await
1060 .unwrap();
1061 }
1062 log_store
1064 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
1065 .await
1066 .unwrap();
1067
1068 assert_eq!(log_store.total_manifest_size(), 63);
1070
1071 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1073
1074 assert_eq!(log_store.total_manifest_size(), 39);
1076
1077 assert_eq!(
1079 log_store
1080 .delete_until(ManifestVersion::MAX, false)
1081 .await
1082 .unwrap(),
1083 3
1084 );
1085
1086 assert_eq!(log_store.total_manifest_size(), 0);
1087 }
1088
1089 #[tokio::test]
1090 async fn test_compressed_manifest_files_size() {
1091 let mut log_store = new_test_manifest_store();
1092 log_store.compress_type = CompressionType::Gzip;
1094 for v in 0..5 {
1096 log_store
1097 .save(v, format!("hello, {v}").as_bytes(), false)
1098 .await
1099 .unwrap();
1100 }
1101 log_store
1102 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
1103 .await
1104 .unwrap();
1105
1106 assert_eq!(log_store.total_manifest_size(), 181);
1108
1109 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
1111
1112 assert_eq!(log_store.total_manifest_size(), 97);
1114
1115 assert_eq!(
1117 log_store
1118 .delete_until(ManifestVersion::MAX, false)
1119 .await
1120 .unwrap(),
1121 3
1122 );
1123
1124 assert_eq!(log_store.total_manifest_size(), 0);
1125 }
1126}