1pub(crate) mod checkpoint;
16pub(crate) mod delta;
17pub(crate) mod size_tracker;
18pub(crate) mod staging;
19pub(crate) mod utils;
20
21use std::iter::Iterator;
22use std::str::FromStr;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use common_datasource::compression::CompressionType;
27use common_telemetry::{debug, warn};
28use crc32fast::Hasher;
29use lazy_static::lazy_static;
30use object_store::util::join_dir;
31use object_store::{Lister, ObjectStore, util};
32use regex::Regex;
33#[cfg(test)]
34use snafu::ResultExt;
35use snafu::ensure;
36use store_api::ManifestVersion;
37use store_api::storage::RegionId;
38
39use crate::cache::manifest_cache::ManifestCache;
40#[cfg(test)]
41use crate::error::OpenDalSnafu;
42use crate::error::{ChecksumMismatchSnafu, Result};
43use crate::manifest::storage::checkpoint::CheckpointStorage;
44use crate::manifest::storage::delta::DeltaStorage;
45use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
46use crate::manifest::storage::staging::StagingStorage;
47use crate::manifest::storage::utils::remove_from_cache;
48
49lazy_static! {
50 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
51 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
52}
53
54pub const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
55const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
56pub(crate) const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
59const FETCH_MANIFEST_PARALLELISM: usize = 16;
60
61pub fn manifest_dir(region_dir: &str) -> String {
63 join_dir(region_dir, "manifest")
64}
65
66pub const fn manifest_compress_type(compress: bool) -> CompressionType {
68 if compress {
69 DEFAULT_MANIFEST_COMPRESSION_TYPE
70 } else {
71 FALL_BACK_COMPRESS_TYPE
72 }
73}
74
75pub fn delta_file(version: ManifestVersion) -> String {
76 format!("{version:020}.json")
77}
78
79pub fn checkpoint_file(version: ManifestVersion) -> String {
80 format!("{version:020}.checkpoint")
81}
82
83pub(crate) fn list_start_after(path: &str, version: ManifestVersion) -> String {
92 debug_assert!(
93 path.ends_with('/'),
94 "list_start_after: path must end with '/', got {path:?}",
95 );
96 format!("{path}{version:020}")
99}
100
101pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
102 if compress_type == CompressionType::Uncompressed {
103 format!("{}{}", path, file)
104 } else {
105 format!("{}{}.{}", path, file, compress_type.file_extension())
106 }
107}
108
109pub(crate) fn checkpoint_checksum(data: &[u8]) -> u32 {
110 let mut hasher = Hasher::new();
111 hasher.update(data);
112 hasher.finalize()
113}
114
115pub(crate) fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
116 if let Some(checksum) = wanted {
117 let calculated_checksum = checkpoint_checksum(data);
118 ensure!(
119 checksum == calculated_checksum,
120 ChecksumMismatchSnafu {
121 actual: calculated_checksum,
122 expected: checksum,
123 }
124 );
125 }
126 Ok(())
127}
128
129pub fn file_version(path: &str) -> ManifestVersion {
134 let s = path.split('.').next().unwrap();
135 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
136}
137
138pub fn file_compress_type(path: &str) -> CompressionType {
143 let s = path.rsplit('.').next().unwrap_or("");
144 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
145}
146
147pub fn is_delta_file(file_name: &str) -> bool {
148 DELTA_RE.is_match(file_name)
149}
150
151pub fn is_checkpoint_file(file_name: &str) -> bool {
152 CHECKPOINT_RE.is_match(file_name)
153}
154
155#[derive(Clone, Debug)]
156pub struct ManifestObjectStore {
157 object_store: ObjectStore,
158 path: String,
159 manifest_cache: Option<ManifestCache>,
161 size_tracker: SizeTracker,
163 checkpoint_storage: CheckpointStorage<CheckpointTracker>,
165 delta_storage: DeltaStorage<DeltaTracker>,
167 staging_storage: StagingStorage,
169}
170
171impl ManifestObjectStore {
172 pub fn new(
173 path: &str,
174 object_store: ObjectStore,
175 compress_type: CompressionType,
176 total_manifest_size: Arc<AtomicU64>,
177 manifest_cache: Option<ManifestCache>,
178 ) -> Self {
179 common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
180
181 let path = util::normalize_dir(path);
182 let size_tracker = SizeTracker::new(total_manifest_size);
183 let checkpoint_tracker = Arc::new(size_tracker.checkpoint_tracker());
184 let delta_tracker = Arc::new(size_tracker.manifest_tracker());
185 let checkpoint_storage = CheckpointStorage::new(
186 path.clone(),
187 object_store.clone(),
188 compress_type,
189 manifest_cache.clone(),
190 checkpoint_tracker,
191 );
192 let delta_storage = DeltaStorage::new(
193 path.clone(),
194 object_store.clone(),
195 compress_type,
196 manifest_cache.clone(),
197 delta_tracker,
198 );
199 let staging_storage =
200 StagingStorage::new(path.clone(), object_store.clone(), compress_type);
201
202 Self {
203 object_store,
204 path,
205 manifest_cache,
206 size_tracker,
207 checkpoint_storage,
208 delta_storage,
209 staging_storage,
210 }
211 }
212
213 pub(crate) fn manifest_dir(&self) -> &str {
215 &self.path
216 }
217
218 pub(crate) async fn manifest_lister(
224 &self,
225 is_staging: bool,
226 start_after: Option<&str>,
227 ) -> Result<Option<Lister>> {
228 if is_staging {
229 self.staging_storage.manifest_lister().await
230 } else {
231 self.delta_storage.manifest_lister(start_after).await
232 }
233 }
234
235 pub async fn fetch_manifests_strict_from(
238 &self,
239 start_version: ManifestVersion,
240 end_version: ManifestVersion,
241 region_id: RegionId,
242 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
243 self.delta_storage
244 .fetch_manifests_strict_from(start_version, end_version, region_id)
245 .await
246 }
247
248 pub async fn fetch_manifests(
253 &self,
254 start_version: ManifestVersion,
255 end_version: ManifestVersion,
256 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
257 self.delta_storage
258 .fetch_manifests(start_version, end_version)
259 .await
260 }
261
262 pub async fn delete_until(
267 &self,
268 end: ManifestVersion,
269 keep_last_checkpoint: bool,
270 ) -> Result<usize> {
271 let entries: Vec<_> = self
278 .delta_storage
279 .get_paths(None, |entry| {
280 let file_name = entry.name();
281 let is_checkpoint = is_checkpoint_file(file_name);
282 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
283 let version = file_version(file_name);
284 if version < end {
285 return Some((entry, is_checkpoint, version));
286 }
287 }
288 None
289 })
290 .await?;
291 let checkpoint_version = if keep_last_checkpoint {
292 entries
294 .iter()
295 .filter_map(
296 |(_e, is_checkpoint, version)| {
297 if *is_checkpoint { Some(version) } else { None }
298 },
299 )
300 .max()
301 } else {
302 None
303 };
304 let del_entries: Vec<_> = entries
305 .iter()
306 .filter(|(_e, is_checkpoint, version)| {
307 if let Some(max_version) = checkpoint_version {
308 if *is_checkpoint {
309 version < max_version
311 } else {
312 version <= max_version
315 }
316 } else {
317 true
318 }
319 })
320 .collect();
321 let paths = del_entries
322 .iter()
323 .map(|(e, _, _)| e.path().to_string())
324 .collect::<Vec<_>>();
325 let total = paths.len();
326
327 debug!(
328 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
329 total, self.path, end, checkpoint_version, paths,
330 );
331
332 for (entry, _, _) in &del_entries {
334 remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
335 }
336
337 let mut succeeded = vec![false; del_entries.len()];
340 match self.object_store.delete_iter(paths.clone()).await {
341 Ok(()) => succeeded.fill(true),
342 Err(batch_err) => {
343 warn!(
344 batch_err;
345 "Batch delete failed for manifest path {}, falling back to per-file delete for {} paths",
346 self.path, total,
347 );
348 for (i, path) in paths.iter().enumerate() {
349 if let Err(e) = self.object_store.delete(path).await {
350 warn!(
351 e;
352 "Failed to delete manifest file {} under {}, aborting fallback, {} files will be retried on next checkpoint",
353 path, self.path, total - i,
354 );
355 break;
356 }
357 succeeded[i] = true;
358 }
359 }
360 }
361
362 let mut deleted = 0usize;
363 for (i, (_, is_checkpoint, version)) in del_entries.iter().enumerate() {
364 if !succeeded[i] {
365 continue;
366 }
367 deleted += 1;
368 if *is_checkpoint {
369 self.size_tracker
370 .remove(&size_tracker::FileKey::Checkpoint(*version));
371 } else {
372 self.size_tracker
373 .remove(&size_tracker::FileKey::Delta(*version));
374 }
375 }
376
377 Ok(deleted)
378 }
379
380 pub async fn save(
382 &mut self,
383 version: ManifestVersion,
384 bytes: &[u8],
385 is_staging: bool,
386 ) -> Result<()> {
387 if is_staging {
388 self.staging_storage.save(version, bytes).await
389 } else {
390 self.delta_storage.save(version, bytes).await
391 }
392 }
393
394 pub(crate) async fn save_checkpoint(
396 &self,
397 version: ManifestVersion,
398 bytes: &[u8],
399 ) -> Result<()> {
400 self.checkpoint_storage
401 .save_checkpoint(version, bytes)
402 .await
403 }
404
405 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
408 self.checkpoint_storage.load_last_checkpoint().await
409 }
410
411 pub(crate) fn total_manifest_size(&self) -> u64 {
413 self.size_tracker.total()
414 }
415
416 pub(crate) fn reset_manifest_size(&mut self) {
418 self.size_tracker.reset();
419 }
420
421 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
423 self.size_tracker.record_delta(version, size);
424 }
425
426 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
428 self.size_tracker.record_checkpoint(version, size);
429 }
430
431 pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
433 self.staging_storage.fetch_manifests().await
434 }
435
436 pub async fn clear_staging_manifests(&mut self) -> Result<()> {
438 self.staging_storage.clear().await
439 }
440
441 pub(crate) fn staging_storage(&self) -> &StagingStorage {
443 &self.staging_storage
444 }
445}
446
447#[cfg(test)]
448impl ManifestObjectStore {
449 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
450 self.object_store
451 .read(path)
452 .await
453 .context(OpenDalSnafu)
454 .map(|v| v.to_vec())
455 }
456
457 pub(crate) fn checkpoint_storage(&self) -> &CheckpointStorage<CheckpointTracker> {
458 &self.checkpoint_storage
459 }
460
461 pub(crate) fn delta_storage(&self) -> &DeltaStorage<DeltaTracker> {
462 &self.delta_storage
463 }
464
465 pub(crate) fn set_compress_type(&mut self, compress_type: CompressionType) {
466 self.checkpoint_storage.set_compress_type(compress_type);
467 self.delta_storage.set_compress_type(compress_type);
468 self.staging_storage.set_compress_type(compress_type);
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use common_test_util::temp_dir::create_temp_dir;
475 use object_store::ObjectStore;
476 use object_store::services::Fs;
477
478 use super::*;
479 use crate::manifest::storage::checkpoint::CheckpointMetadata;
480
481 fn new_test_manifest_store() -> ManifestObjectStore {
482 new_test_manifest_store_at("/")
483 }
484
485 fn new_test_manifest_store_at(path: &str) -> ManifestObjectStore {
486 common_telemetry::init_default_ut_logging();
487 let tmp_dir = create_temp_dir("test_manifest_log_store");
488 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
489 let object_store = ObjectStore::new(builder).unwrap().finish();
490 ManifestObjectStore::new(
491 path,
492 object_store,
493 CompressionType::Uncompressed,
494 Default::default(),
495 None,
496 )
497 }
498
499 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
500 CheckpointMetadata {
501 size: 0,
502 version,
503 checksum: None,
504 extend_metadata: Default::default(),
505 }
506 }
507
508 #[test]
509 fn test_compress_file_path_generation() {
511 let path = "/foo/bar/";
512 let version: ManifestVersion = 0;
513 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
514 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
515 }
516
517 #[tokio::test]
518 async fn test_manifest_log_store_uncompress() {
519 let mut log_store = new_test_manifest_store();
520 log_store.set_compress_type(CompressionType::Uncompressed);
521 test_manifest_log_store_case(log_store).await;
522 }
523
524 #[tokio::test]
525 async fn test_manifest_log_store_compress() {
526 let mut log_store = new_test_manifest_store();
527 log_store.set_compress_type(CompressionType::Gzip);
528 test_manifest_log_store_case(log_store).await;
529 }
530
531 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
532 for v in 0..5 {
533 log_store
534 .save(v, format!("hello, {v}").as_bytes(), false)
535 .await
536 .unwrap();
537 }
538
539 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
540 let mut it = manifests.into_iter();
541 for v in 1..4 {
542 let (version, bytes) = it.next().unwrap();
543 assert_eq!(v, version);
544 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
545 }
546 assert!(it.next().is_none());
547
548 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
549 let mut it = manifests.into_iter();
550 for v in 0..5 {
551 let (version, bytes) = it.next().unwrap();
552 assert_eq!(v, version);
553 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
554 }
555 assert!(it.next().is_none());
556
557 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
559 log_store
560 .save_checkpoint(3, "checkpoint".as_bytes())
561 .await
562 .unwrap();
563
564 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
565 assert_eq!(checkpoint, "checkpoint".as_bytes());
566 assert_eq!(3, v);
567
568 let _ = log_store.delete_until(4, true).await.unwrap();
570 let _ = log_store
571 .checkpoint_storage
572 .load_checkpoint(new_checkpoint_metadata_with_version(3))
573 .await
574 .unwrap()
575 .unwrap();
576 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
577 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
578 let mut it = manifests.into_iter();
579
580 let (version, bytes) = it.next().unwrap();
581 assert_eq!(4, version);
582 assert_eq!("hello, 4".as_bytes(), bytes);
583 assert!(it.next().is_none());
584
585 let _ = log_store.delete_until(11, false).await.unwrap();
587 assert!(
588 log_store
589 .checkpoint_storage
590 .load_checkpoint(new_checkpoint_metadata_with_version(3))
591 .await
592 .unwrap()
593 .is_none()
594 );
595 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
596 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
597 let mut it = manifests.into_iter();
598
599 assert!(it.next().is_none());
600 }
601
602 #[tokio::test]
603 async fn test_compress_backward_compatible() {
605 let mut log_store = new_test_manifest_store();
606
607 log_store.set_compress_type(CompressionType::Uncompressed);
609 for v in 0..5 {
610 log_store
611 .save(v, format!("hello, {v}").as_bytes(), false)
612 .await
613 .unwrap();
614 }
615 log_store
616 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
617 .await
618 .unwrap();
619
620 log_store.set_compress_type(CompressionType::Gzip);
622
623 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
625 assert_eq!(v, 5);
626 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
627
628 for v in 5..10 {
630 log_store
631 .save(v, format!("hello, {v}").as_bytes(), false)
632 .await
633 .unwrap();
634 }
635 log_store
636 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
637 .await
638 .unwrap();
639
640 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
642 let mut it = manifests.into_iter();
643
644 for v in 0..10 {
645 let (version, bytes) = it.next().unwrap();
646 assert_eq!(v, version);
647 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
648 }
649 let (v, checkpoint) = log_store
650 .checkpoint_storage
651 .load_checkpoint(new_checkpoint_metadata_with_version(5))
652 .await
653 .unwrap()
654 .unwrap();
655 assert_eq!(v, 5);
656 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
657 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
658 assert_eq!(v, 10);
659 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
660
661 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
664 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
665 let mut it = manifests.into_iter();
666 assert!(it.next().is_none());
667 }
668
669 #[tokio::test]
670 async fn test_file_version() {
671 let version = file_version("00000000000000000007.checkpoint");
672 assert_eq!(version, 7);
673
674 let name = delta_file(version);
675 assert_eq!(name, "00000000000000000007.json");
676
677 let name = checkpoint_file(version);
678 assert_eq!(name, "00000000000000000007.checkpoint");
679 }
680
681 #[tokio::test]
682 async fn test_uncompressed_manifest_files_size() {
683 let mut log_store = new_test_manifest_store();
684 log_store.set_compress_type(CompressionType::Uncompressed);
686 for v in 0..5 {
687 log_store
688 .save(v, format!("hello, {v}").as_bytes(), false)
689 .await
690 .unwrap();
691 }
692 log_store
694 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
695 .await
696 .unwrap();
697
698 assert_eq!(log_store.total_manifest_size(), 63);
700
701 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
703
704 assert_eq!(log_store.total_manifest_size(), 39);
706
707 assert_eq!(
709 log_store
710 .delete_until(ManifestVersion::MAX, false)
711 .await
712 .unwrap(),
713 3
714 );
715
716 assert_eq!(log_store.total_manifest_size(), 0);
717 }
718
719 #[tokio::test]
720 async fn test_compressed_manifest_files_size() {
721 let mut log_store = new_test_manifest_store();
722 log_store.set_compress_type(CompressionType::Gzip);
724 for v in 0..5 {
726 log_store
727 .save(v, format!("hello, {v}").as_bytes(), false)
728 .await
729 .unwrap();
730 }
731 log_store
732 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
733 .await
734 .unwrap();
735
736 assert_eq!(log_store.total_manifest_size(), 181);
738
739 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
741
742 assert_eq!(log_store.total_manifest_size(), 97);
744
745 assert_eq!(
747 log_store
748 .delete_until(ManifestVersion::MAX, false)
749 .await
750 .unwrap(),
751 3
752 );
753
754 assert_eq!(log_store.total_manifest_size(), 0);
755 }
756
757 #[tokio::test]
758 async fn test_scan_with_start_after_uncompress() {
759 let mut log_store = new_test_manifest_store();
760 log_store.set_compress_type(CompressionType::Uncompressed);
761 test_scan_with_start_after_case(log_store).await;
762 }
763
764 #[tokio::test]
765 async fn test_scan_with_start_after_compress() {
766 let mut log_store = new_test_manifest_store();
767 log_store.set_compress_type(CompressionType::Gzip);
768 test_scan_with_start_after_case(log_store).await;
769 }
770
771 #[tokio::test]
775 async fn test_scan_with_start_after_nested_path() {
776 let mut log_store = new_test_manifest_store_at("/nested/region-1/");
777 log_store.set_compress_type(CompressionType::Uncompressed);
778 test_scan_with_start_after_case(log_store).await;
779 }
780
781 async fn test_scan_with_start_after_case(mut log_store: ManifestObjectStore) {
782 for v in 0..10 {
783 log_store
784 .save(v, format!("hello, {v}").as_bytes(), false)
785 .await
786 .unwrap();
787 }
788 log_store
791 .save_checkpoint(5, "checkpoint".as_bytes())
792 .await
793 .unwrap();
794
795 let entries = log_store.delta_storage.scan(3, 10).await.unwrap();
797 let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
798 assert_eq!(versions, vec![3, 4, 5, 6, 7, 8, 9]);
799
800 let entries = log_store.delta_storage.scan(0, 10).await.unwrap();
802 let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
803 assert_eq!(versions, (0..10).collect::<Vec<_>>());
804
805 let entries = log_store.delta_storage.scan(7, 9).await.unwrap();
807 let versions: Vec<_> = entries.iter().map(|(v, _)| *v).collect();
808 assert_eq!(versions, vec![7, 8]);
809
810 let entries = log_store
812 .delta_storage
813 .scan(10, ManifestVersion::MAX)
814 .await
815 .unwrap();
816 assert!(entries.is_empty());
817 }
818}