1pub(crate) mod checkpoint;
16pub(crate) mod delta;
17pub(crate) mod size_tracker;
18pub(crate) mod staging;
19pub(crate) mod utils;
20
21use std::iter::Iterator;
22use std::str::FromStr;
23use std::sync::Arc;
24use std::sync::atomic::AtomicU64;
25
26use common_datasource::compression::CompressionType;
27use common_telemetry::debug;
28use crc32fast::Hasher;
29use lazy_static::lazy_static;
30use object_store::util::join_dir;
31use object_store::{Lister, ObjectStore, util};
32use regex::Regex;
33use snafu::{ResultExt, ensure};
34use store_api::ManifestVersion;
35use store_api::storage::RegionId;
36
37use crate::cache::manifest_cache::ManifestCache;
38use crate::error::{ChecksumMismatchSnafu, OpenDalSnafu, Result};
39use crate::manifest::storage::checkpoint::CheckpointStorage;
40use crate::manifest::storage::delta::DeltaStorage;
41use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
42use crate::manifest::storage::staging::StagingStorage;
43use crate::manifest::storage::utils::remove_from_cache;
44
45lazy_static! {
46 static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
47 static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
48}
49
50pub const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
51const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
52pub(crate) const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
55const FETCH_MANIFEST_PARALLELISM: usize = 16;
56
57pub fn manifest_dir(region_dir: &str) -> String {
59 join_dir(region_dir, "manifest")
60}
61
62pub const fn manifest_compress_type(compress: bool) -> CompressionType {
64 if compress {
65 DEFAULT_MANIFEST_COMPRESSION_TYPE
66 } else {
67 FALL_BACK_COMPRESS_TYPE
68 }
69}
70
71pub fn delta_file(version: ManifestVersion) -> String {
72 format!("{version:020}.json")
73}
74
75pub fn checkpoint_file(version: ManifestVersion) -> String {
76 format!("{version:020}.checkpoint")
77}
78
79pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String {
80 if compress_type == CompressionType::Uncompressed {
81 format!("{}{}", path, file)
82 } else {
83 format!("{}{}.{}", path, file, compress_type.file_extension())
84 }
85}
86
87pub(crate) fn checkpoint_checksum(data: &[u8]) -> u32 {
88 let mut hasher = Hasher::new();
89 hasher.update(data);
90 hasher.finalize()
91}
92
93pub(crate) fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
94 if let Some(checksum) = wanted {
95 let calculated_checksum = checkpoint_checksum(data);
96 ensure!(
97 checksum == calculated_checksum,
98 ChecksumMismatchSnafu {
99 actual: calculated_checksum,
100 expected: checksum,
101 }
102 );
103 }
104 Ok(())
105}
106
107pub fn file_version(path: &str) -> ManifestVersion {
112 let s = path.split('.').next().unwrap();
113 s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}"))
114}
115
116pub fn file_compress_type(path: &str) -> CompressionType {
121 let s = path.rsplit('.').next().unwrap_or("");
122 CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed)
123}
124
125pub fn is_delta_file(file_name: &str) -> bool {
126 DELTA_RE.is_match(file_name)
127}
128
129pub fn is_checkpoint_file(file_name: &str) -> bool {
130 CHECKPOINT_RE.is_match(file_name)
131}
132
133#[derive(Clone, Debug)]
134pub struct ManifestObjectStore {
135 object_store: ObjectStore,
136 path: String,
137 manifest_cache: Option<ManifestCache>,
139 size_tracker: SizeTracker,
141 checkpoint_storage: CheckpointStorage<CheckpointTracker>,
143 delta_storage: DeltaStorage<DeltaTracker>,
145 staging_storage: StagingStorage,
147}
148
149impl ManifestObjectStore {
150 pub fn new(
151 path: &str,
152 object_store: ObjectStore,
153 compress_type: CompressionType,
154 total_manifest_size: Arc<AtomicU64>,
155 manifest_cache: Option<ManifestCache>,
156 ) -> Self {
157 common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
158
159 let path = util::normalize_dir(path);
160 let size_tracker = SizeTracker::new(total_manifest_size);
161 let checkpoint_tracker = Arc::new(size_tracker.checkpoint_tracker());
162 let delta_tracker = Arc::new(size_tracker.manifest_tracker());
163 let checkpoint_storage = CheckpointStorage::new(
164 path.clone(),
165 object_store.clone(),
166 compress_type,
167 manifest_cache.clone(),
168 checkpoint_tracker,
169 );
170 let delta_storage = DeltaStorage::new(
171 path.clone(),
172 object_store.clone(),
173 compress_type,
174 manifest_cache.clone(),
175 delta_tracker,
176 );
177 let staging_storage =
178 StagingStorage::new(path.clone(), object_store.clone(), compress_type);
179
180 Self {
181 object_store,
182 path,
183 manifest_cache,
184 size_tracker,
185 checkpoint_storage,
186 delta_storage,
187 staging_storage,
188 }
189 }
190
191 pub(crate) fn manifest_dir(&self) -> &str {
193 &self.path
194 }
195
196 pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
198 if is_staging {
199 self.staging_storage.manifest_lister().await
200 } else {
201 self.delta_storage.manifest_lister().await
202 }
203 }
204
205 pub async fn fetch_manifests_strict_from(
208 &self,
209 start_version: ManifestVersion,
210 end_version: ManifestVersion,
211 region_id: RegionId,
212 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
213 self.delta_storage
214 .fetch_manifests_strict_from(start_version, end_version, region_id)
215 .await
216 }
217
218 pub async fn fetch_manifests(
223 &self,
224 start_version: ManifestVersion,
225 end_version: ManifestVersion,
226 ) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
227 self.delta_storage
228 .fetch_manifests(start_version, end_version)
229 .await
230 }
231
232 pub async fn delete_until(
237 &self,
238 end: ManifestVersion,
239 keep_last_checkpoint: bool,
240 ) -> Result<usize> {
241 let entries: Vec<_> = self
243 .delta_storage
244 .get_paths(|entry| {
245 let file_name = entry.name();
246 let is_checkpoint = is_checkpoint_file(file_name);
247 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
248 let version = file_version(file_name);
249 if version < end {
250 return Some((entry, is_checkpoint, version));
251 }
252 }
253 None
254 })
255 .await?;
256 let checkpoint_version = if keep_last_checkpoint {
257 entries
259 .iter()
260 .filter_map(
261 |(_e, is_checkpoint, version)| {
262 if *is_checkpoint { Some(version) } else { None }
263 },
264 )
265 .max()
266 } else {
267 None
268 };
269 let del_entries: Vec<_> = entries
270 .iter()
271 .filter(|(_e, is_checkpoint, version)| {
272 if let Some(max_version) = checkpoint_version {
273 if *is_checkpoint {
274 version < max_version
276 } else {
277 version <= max_version
280 }
281 } else {
282 true
283 }
284 })
285 .collect();
286 let paths = del_entries
287 .iter()
288 .map(|(e, _, _)| e.path().to_string())
289 .collect::<Vec<_>>();
290 let ret = paths.len();
291
292 debug!(
293 "Deleting {} logs from manifest storage path {} until {}, checkpoint_version: {:?}, paths: {:?}",
294 ret, self.path, end, checkpoint_version, paths,
295 );
296
297 for (entry, _, _) in &del_entries {
299 remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
300 }
301
302 self.object_store
303 .delete_iter(paths)
304 .await
305 .context(OpenDalSnafu)?;
306
307 for (_, is_checkpoint, version) in &del_entries {
309 if *is_checkpoint {
310 self.size_tracker
311 .remove(&size_tracker::FileKey::Checkpoint(*version));
312 } else {
313 self.size_tracker
314 .remove(&size_tracker::FileKey::Delta(*version));
315 }
316 }
317
318 Ok(ret)
319 }
320
321 pub async fn save(
323 &mut self,
324 version: ManifestVersion,
325 bytes: &[u8],
326 is_staging: bool,
327 ) -> Result<()> {
328 if is_staging {
329 self.staging_storage.save(version, bytes).await
330 } else {
331 self.delta_storage.save(version, bytes).await
332 }
333 }
334
335 pub(crate) async fn save_checkpoint(
337 &self,
338 version: ManifestVersion,
339 bytes: &[u8],
340 ) -> Result<()> {
341 self.checkpoint_storage
342 .save_checkpoint(version, bytes)
343 .await
344 }
345
346 pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
349 self.checkpoint_storage.load_last_checkpoint().await
350 }
351
352 pub(crate) fn total_manifest_size(&self) -> u64 {
354 self.size_tracker.total()
355 }
356
357 pub(crate) fn reset_manifest_size(&mut self) {
359 self.size_tracker.reset();
360 }
361
362 pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
364 self.size_tracker.record_delta(version, size);
365 }
366
367 pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
369 self.size_tracker.record_checkpoint(version, size);
370 }
371
372 pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
374 self.staging_storage.fetch_manifests().await
375 }
376
377 pub async fn clear_staging_manifests(&mut self) -> Result<()> {
379 self.staging_storage.clear().await
380 }
381}
382
383#[cfg(test)]
384impl ManifestObjectStore {
385 pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
386 self.object_store
387 .read(path)
388 .await
389 .context(OpenDalSnafu)
390 .map(|v| v.to_vec())
391 }
392
393 pub(crate) fn checkpoint_storage(&self) -> &CheckpointStorage<CheckpointTracker> {
394 &self.checkpoint_storage
395 }
396
397 pub(crate) fn delta_storage(&self) -> &DeltaStorage<DeltaTracker> {
398 &self.delta_storage
399 }
400
401 pub(crate) fn set_compress_type(&mut self, compress_type: CompressionType) {
402 self.checkpoint_storage.set_compress_type(compress_type);
403 self.delta_storage.set_compress_type(compress_type);
404 self.staging_storage.set_compress_type(compress_type);
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use common_test_util::temp_dir::create_temp_dir;
411 use object_store::ObjectStore;
412 use object_store::services::Fs;
413
414 use super::*;
415 use crate::manifest::storage::checkpoint::CheckpointMetadata;
416
417 fn new_test_manifest_store() -> ManifestObjectStore {
418 common_telemetry::init_default_ut_logging();
419 let tmp_dir = create_temp_dir("test_manifest_log_store");
420 let builder = Fs::default().root(&tmp_dir.path().to_string_lossy());
421 let object_store = ObjectStore::new(builder).unwrap().finish();
422 ManifestObjectStore::new(
423 "/",
424 object_store,
425 CompressionType::Uncompressed,
426 Default::default(),
427 None,
428 )
429 }
430
431 fn new_checkpoint_metadata_with_version(version: ManifestVersion) -> CheckpointMetadata {
432 CheckpointMetadata {
433 size: 0,
434 version,
435 checksum: None,
436 extend_metadata: Default::default(),
437 }
438 }
439
440 #[test]
441 fn test_compress_file_path_generation() {
443 let path = "/foo/bar/";
444 let version: ManifestVersion = 0;
445 let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip);
446 assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz")
447 }
448
449 #[tokio::test]
450 async fn test_manifest_log_store_uncompress() {
451 let mut log_store = new_test_manifest_store();
452 log_store.set_compress_type(CompressionType::Uncompressed);
453 test_manifest_log_store_case(log_store).await;
454 }
455
456 #[tokio::test]
457 async fn test_manifest_log_store_compress() {
458 let mut log_store = new_test_manifest_store();
459 log_store.set_compress_type(CompressionType::Gzip);
460 test_manifest_log_store_case(log_store).await;
461 }
462
463 async fn test_manifest_log_store_case(mut log_store: ManifestObjectStore) {
464 for v in 0..5 {
465 log_store
466 .save(v, format!("hello, {v}").as_bytes(), false)
467 .await
468 .unwrap();
469 }
470
471 let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
472 let mut it = manifests.into_iter();
473 for v in 1..4 {
474 let (version, bytes) = it.next().unwrap();
475 assert_eq!(v, version);
476 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
477 }
478 assert!(it.next().is_none());
479
480 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
481 let mut it = manifests.into_iter();
482 for v in 0..5 {
483 let (version, bytes) = it.next().unwrap();
484 assert_eq!(v, version);
485 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
486 }
487 assert!(it.next().is_none());
488
489 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
491 log_store
492 .save_checkpoint(3, "checkpoint".as_bytes())
493 .await
494 .unwrap();
495
496 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
497 assert_eq!(checkpoint, "checkpoint".as_bytes());
498 assert_eq!(3, v);
499
500 let _ = log_store.delete_until(4, true).await.unwrap();
502 let _ = log_store
503 .checkpoint_storage
504 .load_checkpoint(new_checkpoint_metadata_with_version(3))
505 .await
506 .unwrap()
507 .unwrap();
508 let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
509 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
510 let mut it = manifests.into_iter();
511
512 let (version, bytes) = it.next().unwrap();
513 assert_eq!(4, version);
514 assert_eq!("hello, 4".as_bytes(), bytes);
515 assert!(it.next().is_none());
516
517 let _ = log_store.delete_until(11, false).await.unwrap();
519 assert!(
520 log_store
521 .checkpoint_storage
522 .load_checkpoint(new_checkpoint_metadata_with_version(3))
523 .await
524 .unwrap()
525 .is_none()
526 );
527 assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
528 let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
529 let mut it = manifests.into_iter();
530
531 assert!(it.next().is_none());
532 }
533
534 #[tokio::test]
535 async fn test_compress_backward_compatible() {
537 let mut log_store = new_test_manifest_store();
538
539 log_store.set_compress_type(CompressionType::Uncompressed);
541 for v in 0..5 {
542 log_store
543 .save(v, format!("hello, {v}").as_bytes(), false)
544 .await
545 .unwrap();
546 }
547 log_store
548 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
549 .await
550 .unwrap();
551
552 log_store.set_compress_type(CompressionType::Gzip);
554
555 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
557 assert_eq!(v, 5);
558 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
559
560 for v in 5..10 {
562 log_store
563 .save(v, format!("hello, {v}").as_bytes(), false)
564 .await
565 .unwrap();
566 }
567 log_store
568 .save_checkpoint(10, "checkpoint_compressed".as_bytes())
569 .await
570 .unwrap();
571
572 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
574 let mut it = manifests.into_iter();
575
576 for v in 0..10 {
577 let (version, bytes) = it.next().unwrap();
578 assert_eq!(v, version);
579 assert_eq!(format!("hello, {v}").as_bytes(), bytes);
580 }
581 let (v, checkpoint) = log_store
582 .checkpoint_storage
583 .load_checkpoint(new_checkpoint_metadata_with_version(5))
584 .await
585 .unwrap()
586 .unwrap();
587 assert_eq!(v, 5);
588 assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes());
589 let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
590 assert_eq!(v, 10);
591 assert_eq!(checkpoint, "checkpoint_compressed".as_bytes());
592
593 assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
596 let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
597 let mut it = manifests.into_iter();
598 assert!(it.next().is_none());
599 }
600
601 #[tokio::test]
602 async fn test_file_version() {
603 let version = file_version("00000000000000000007.checkpoint");
604 assert_eq!(version, 7);
605
606 let name = delta_file(version);
607 assert_eq!(name, "00000000000000000007.json");
608
609 let name = checkpoint_file(version);
610 assert_eq!(name, "00000000000000000007.checkpoint");
611 }
612
613 #[tokio::test]
614 async fn test_uncompressed_manifest_files_size() {
615 let mut log_store = new_test_manifest_store();
616 log_store.set_compress_type(CompressionType::Uncompressed);
618 for v in 0..5 {
619 log_store
620 .save(v, format!("hello, {v}").as_bytes(), false)
621 .await
622 .unwrap();
623 }
624 log_store
626 .save_checkpoint(5, "checkpoint_uncompressed".as_bytes())
627 .await
628 .unwrap();
629
630 assert_eq!(log_store.total_manifest_size(), 63);
632
633 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
635
636 assert_eq!(log_store.total_manifest_size(), 39);
638
639 assert_eq!(
641 log_store
642 .delete_until(ManifestVersion::MAX, false)
643 .await
644 .unwrap(),
645 3
646 );
647
648 assert_eq!(log_store.total_manifest_size(), 0);
649 }
650
651 #[tokio::test]
652 async fn test_compressed_manifest_files_size() {
653 let mut log_store = new_test_manifest_store();
654 log_store.set_compress_type(CompressionType::Gzip);
656 for v in 0..5 {
658 log_store
659 .save(v, format!("hello, {v}").as_bytes(), false)
660 .await
661 .unwrap();
662 }
663 log_store
664 .save_checkpoint(5, "checkpoint_compressed".as_bytes())
665 .await
666 .unwrap();
667
668 assert_eq!(log_store.total_manifest_size(), 181);
670
671 assert_eq!(log_store.delete_until(3, false).await.unwrap(), 3);
673
674 assert_eq!(log_store.total_manifest_size(), 97);
676
677 assert_eq!(
679 log_store
680 .delete_until(ManifestVersion::MAX, false)
681 .await
682 .unwrap(),
683 3
684 );
685
686 assert_eq!(log_store.total_manifest_size(), 0);
687 }
688}