1use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use common_datasource::compression::CompressionType;
19use common_telemetry::{debug, info};
20use futures::TryStreamExt;
21use object_store::ObjectStore;
22use snafu::{ensure, OptionExt, ResultExt};
23use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
24use store_api::metadata::RegionMetadataRef;
25
26use crate::error::{
27 self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
28};
29use crate::manifest::action::{
30 RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
31 RegionMetaActionList,
32};
33use crate::manifest::checkpointer::Checkpointer;
34use crate::manifest::storage::{
35 file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore,
36};
37use crate::metrics::MANIFEST_OP_ELAPSED;
38
39#[derive(Debug, Clone)]
41pub struct RegionManifestOptions {
42 pub manifest_dir: String,
44 pub object_store: ObjectStore,
45 pub compress_type: CompressionType,
46 pub checkpoint_distance: u64,
49}
50
51#[cfg_attr(doc, aquamarine::aquamarine)]
57#[derive(Debug)]
118pub struct RegionManifestManager {
119 store: ManifestObjectStore,
120 last_version: Arc<AtomicU64>,
121 checkpointer: Checkpointer,
122 manifest: Arc<RegionManifest>,
123 stopped: bool,
124}
125
126impl RegionManifestManager {
127 pub async fn new(
129 metadata: RegionMetadataRef,
130 options: RegionManifestOptions,
131 total_manifest_size: Arc<AtomicU64>,
132 manifest_version: Arc<AtomicU64>,
133 ) -> Result<Self> {
134 let mut store = ManifestObjectStore::new(
136 &options.manifest_dir,
137 options.object_store.clone(),
138 options.compress_type,
139 total_manifest_size,
140 );
141
142 info!(
143 "Creating region manifest in {} with metadata {:?}",
144 options.manifest_dir, metadata
145 );
146
147 let version = MIN_VERSION;
148 let mut manifest_builder = RegionManifestBuilder::default();
149 manifest_builder.apply_change(
151 version,
152 RegionChange {
153 metadata: metadata.clone(),
154 },
155 );
156 let manifest = manifest_builder.try_build()?;
157 let region_id = metadata.region_id;
158
159 debug!(
160 "Build region manifest in {}, manifest: {:?}",
161 options.manifest_dir, manifest
162 );
163
164 let action_list =
166 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
167 store.save(version, &action_list.encode()?).await?;
168
169 let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
170 manifest_version.store(version, Ordering::Relaxed);
171 Ok(Self {
172 store,
173 last_version: manifest_version,
174 checkpointer,
175 manifest: Arc::new(manifest),
176 stopped: false,
177 })
178 }
179
180 pub async fn open(
184 options: RegionManifestOptions,
185 total_manifest_size: Arc<AtomicU64>,
186 manifest_version: Arc<AtomicU64>,
187 ) -> Result<Option<Self>> {
188 let _t = MANIFEST_OP_ELAPSED
189 .with_label_values(&["open"])
190 .start_timer();
191
192 let mut store = ManifestObjectStore::new(
194 &options.manifest_dir,
195 options.object_store.clone(),
196 options.compress_type,
197 total_manifest_size,
198 );
199
200 let mut version = MIN_VERSION;
204 let checkpoint = Self::last_checkpoint(&mut store).await?;
205 let last_checkpoint_version = checkpoint
206 .as_ref()
207 .map(|(checkpoint, _)| checkpoint.last_version)
208 .unwrap_or(MIN_VERSION);
209 let mut manifest_builder = if let Some((checkpoint, _)) = checkpoint {
210 info!(
211 "Recover region manifest {} from checkpoint version {}",
212 options.manifest_dir, checkpoint.last_version
213 );
214 version = version.max(checkpoint.last_version + 1);
215 RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
216 } else {
217 info!(
218 "Checkpoint not found in {}, build manifest from scratch",
219 options.manifest_dir
220 );
221 RegionManifestBuilder::default()
222 };
223
224 let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
226
227 for (manifest_version, raw_action_list) in manifests {
228 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
229 store.set_delta_file_size(manifest_version, raw_action_list.len() as u64);
231 for action in action_list.actions {
232 match action {
233 RegionMetaAction::Change(action) => {
234 manifest_builder.apply_change(manifest_version, action);
235 }
236 RegionMetaAction::Edit(action) => {
237 manifest_builder.apply_edit(manifest_version, action);
238 }
239 RegionMetaAction::Remove(_) => {
240 debug!(
241 "Unhandled action in {}, action: {:?}",
242 options.manifest_dir, action
243 );
244 }
245 RegionMetaAction::Truncate(action) => {
246 manifest_builder.apply_truncate(manifest_version, action);
247 }
248 }
249 }
250 }
251
252 if !manifest_builder.contains_metadata() {
254 debug!("No region manifest in {}", options.manifest_dir);
255 return Ok(None);
256 }
257
258 let manifest = manifest_builder.try_build()?;
259 debug!(
260 "Recovered region manifest from {}, manifest: {:?}",
261 options.manifest_dir, manifest
262 );
263 let version = manifest.manifest_version;
264
265 let checkpointer = Checkpointer::new(
266 manifest.metadata.region_id,
267 options,
268 store.clone(),
269 last_checkpoint_version,
270 );
271 manifest_version.store(version, Ordering::Relaxed);
272 Ok(Some(Self {
273 store,
274 last_version: manifest_version,
275 checkpointer,
276 manifest: Arc::new(manifest),
277 stopped: false,
278 }))
279 }
280
281 pub async fn stop(&mut self) {
283 self.stopped = true;
284 }
285
286 pub async fn install_manifest_to(
292 &mut self,
293 target_version: ManifestVersion,
294 ) -> Result<ManifestVersion> {
295 let _t = MANIFEST_OP_ELAPSED
296 .with_label_values(&["install_manifest_to"])
297 .start_timer();
298
299 let last_version = self.last_version();
300 if last_version >= target_version {
302 debug!(
303 "Target version {} is less than or equal to the current version {}, region: {}, skip install",
304 target_version, last_version, self.manifest.metadata.region_id
305 );
306 return Ok(last_version);
307 }
308
309 ensure!(
310 !self.stopped,
311 RegionStoppedSnafu {
312 region_id: self.manifest.metadata.region_id,
313 }
314 );
315
316 let region_id = self.manifest.metadata.region_id;
317 let mut manifests = self
319 .store
320 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
322 .await?;
323
324 if manifests.is_empty() {
331 info!(
332 "Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
333 last_version, self.manifest.metadata.region_id
334 );
335 let last_version = self.install_last_checkpoint().await?;
336 if last_version >= target_version {
338 return Ok(last_version);
339 }
340
341 manifests = self
343 .store
344 .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
346 .await?;
347 }
348
349 if manifests.is_empty() {
350 return NoManifestsSnafu {
351 region_id: self.manifest.metadata.region_id,
352 start_version: last_version + 1,
353 end_version: target_version + 1,
354 last_version,
355 }
356 .fail();
357 }
358
359 debug_assert_eq!(manifests.first().unwrap().0, last_version + 1);
360 let mut manifest_builder =
361 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
362
363 for (manifest_version, raw_action_list) in manifests {
364 self.store
365 .set_delta_file_size(manifest_version, raw_action_list.len() as u64);
366 let action_list = RegionMetaActionList::decode(&raw_action_list)?;
367 for action in action_list.actions {
368 match action {
369 RegionMetaAction::Change(action) => {
370 manifest_builder.apply_change(manifest_version, action);
371 }
372 RegionMetaAction::Edit(action) => {
373 manifest_builder.apply_edit(manifest_version, action);
374 }
375 RegionMetaAction::Remove(_) => {
376 debug!(
377 "Unhandled action for region {}, action: {:?}",
378 self.manifest.metadata.region_id, action
379 );
380 }
381 RegionMetaAction::Truncate(action) => {
382 manifest_builder.apply_truncate(manifest_version, action);
383 }
384 }
385 }
386 }
387
388 let new_manifest = manifest_builder.try_build()?;
389 ensure!(
390 new_manifest.manifest_version >= target_version,
391 InstallManifestToSnafu {
392 region_id: self.manifest.metadata.region_id,
393 target_version,
394 available_version: new_manifest.manifest_version,
395 last_version,
396 }
397 );
398
399 let version = self.last_version();
400 self.manifest = Arc::new(new_manifest);
401 let last_version = self.set_version(self.manifest.manifest_version);
402 info!(
403 "Install manifest changes from {} to {}, region: {}",
404 version, last_version, self.manifest.metadata.region_id
405 );
406
407 Ok(last_version)
408 }
409
410 pub(crate) async fn install_last_checkpoint(&mut self) -> Result<ManifestVersion> {
412 let last_version = self.last_version();
413 let Some((checkpoint, checkpoint_size)) = Self::last_checkpoint(&mut self.store).await?
414 else {
415 return NoCheckpointSnafu {
416 region_id: self.manifest.metadata.region_id,
417 last_version,
418 }
419 .fail();
420 };
421 self.store.reset_manifest_size();
422 self.store
423 .set_checkpoint_file_size(checkpoint.last_version, checkpoint_size);
424 let builder = RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint);
425 let manifest = builder.try_build()?;
426 let last_version = self.set_version(manifest.manifest_version);
427 self.manifest = Arc::new(manifest);
428 info!(
429 "Installed region manifest from checkpoint: {}, region: {}",
430 checkpoint.last_version, self.manifest.metadata.region_id
431 );
432
433 Ok(last_version)
434 }
435
436 pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
438 let _t = MANIFEST_OP_ELAPSED
439 .with_label_values(&["update"])
440 .start_timer();
441
442 ensure!(
443 !self.stopped,
444 RegionStoppedSnafu {
445 region_id: self.manifest.metadata.region_id,
446 }
447 );
448
449 let version = self.increase_version();
450 self.store.save(version, &action_list.encode()?).await?;
451
452 let mut manifest_builder =
453 RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
454 for action in action_list.actions {
455 match action {
456 RegionMetaAction::Change(action) => {
457 manifest_builder.apply_change(version, action);
458 }
459 RegionMetaAction::Edit(action) => {
460 manifest_builder.apply_edit(version, action);
461 }
462 RegionMetaAction::Remove(_) => {
463 debug!(
464 "Unhandled action for region {}, action: {:?}",
465 self.manifest.metadata.region_id, action
466 );
467 }
468 RegionMetaAction::Truncate(action) => {
469 manifest_builder.apply_truncate(version, action);
470 }
471 }
472 }
473 let new_manifest = manifest_builder.try_build()?;
474 self.manifest = Arc::new(new_manifest);
475
476 self.checkpointer
477 .maybe_do_checkpoint(self.manifest.as_ref());
478
479 Ok(version)
480 }
481
482 pub fn manifest(&self) -> Arc<RegionManifest> {
484 self.manifest.clone()
485 }
486
487 pub fn manifest_usage(&self) -> u64 {
489 self.store.total_manifest_size()
490 }
491
492 pub async fn has_update(&self) -> Result<bool> {
498 let last_version = self.last_version();
499
500 let streamer =
501 self.store
502 .manifest_lister()
503 .await?
504 .context(error::EmptyManifestDirSnafu {
505 manifest_dir: self.store.manifest_dir(),
506 })?;
507
508 let need_update = streamer
509 .try_any(|entry| async move {
510 let file_name = entry.name();
511 if is_delta_file(file_name) || is_checkpoint_file(file_name) {
512 let version = file_version(file_name);
513 if version > last_version {
514 return true;
515 }
516 }
517 false
518 })
519 .await
520 .context(error::OpenDalSnafu)?;
521
522 Ok(need_update)
523 }
524
525 fn increase_version(&mut self) -> ManifestVersion {
527 let previous = self.last_version.fetch_add(1, Ordering::Relaxed);
528 previous + 1
529 }
530
531 fn set_version(&mut self, version: ManifestVersion) -> ManifestVersion {
533 self.last_version.store(version, Ordering::Relaxed);
534 version
535 }
536
537 fn last_version(&self) -> ManifestVersion {
538 self.last_version.load(Ordering::Relaxed)
539 }
540
541 pub(crate) async fn last_checkpoint(
546 store: &mut ManifestObjectStore,
547 ) -> Result<Option<(RegionCheckpoint, u64)>> {
548 let last_checkpoint = store.load_last_checkpoint().await?;
549
550 if let Some((_, bytes)) = last_checkpoint {
551 let checkpoint = RegionCheckpoint::decode(&bytes)?;
552 Ok(Some((checkpoint, bytes.len() as u64)))
553 } else {
554 Ok(None)
555 }
556 }
557
558 #[cfg(test)]
559 pub(crate) fn checkpointer(&self) -> &Checkpointer {
560 &self.checkpointer
561 }
562}
563
564#[cfg(test)]
565impl RegionManifestManager {
566 fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) {
567 let manifest = self.manifest();
568 assert_eq!(manifest.metadata, *expect);
569 assert_eq!(self.manifest.manifest_version, self.last_version());
570 assert_eq!(last_version, self.last_version());
571 }
572
573 pub fn store(&self) -> ManifestObjectStore {
574 self.store.clone()
575 }
576}
577
578#[cfg(test)]
579mod test {
580 use std::time::Duration;
581
582 use api::v1::SemanticType;
583 use common_datasource::compression::CompressionType;
584 use common_test_util::temp_dir::create_temp_dir;
585 use datatypes::prelude::ConcreteDataType;
586 use datatypes::schema::ColumnSchema;
587 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
588
589 use super::*;
590 use crate::manifest::action::{RegionChange, RegionEdit};
591 use crate::manifest::tests::utils::basic_region_metadata;
592 use crate::test_util::TestEnv;
593
594 #[tokio::test]
595 async fn create_manifest_manager() {
596 let metadata = Arc::new(basic_region_metadata());
597 let env = TestEnv::new();
598 let manager = env
599 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
600 .await
601 .unwrap()
602 .unwrap();
603
604 manager.validate_manifest(&metadata, 0);
605 }
606
607 #[tokio::test]
608 async fn open_manifest_manager() {
609 let env = TestEnv::new();
610 assert!(env
612 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
613 .await
614 .unwrap()
615 .is_none());
616
617 let metadata = Arc::new(basic_region_metadata());
619 let mut manager = env
620 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
621 .await
622 .unwrap()
623 .unwrap();
624 manager.stop().await;
626
627 let manager = env
629 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
630 .await
631 .unwrap()
632 .unwrap();
633
634 manager.validate_manifest(&metadata, 0);
635 }
636
637 #[tokio::test]
638 async fn region_change_add_column() {
639 let metadata = Arc::new(basic_region_metadata());
640 let env = TestEnv::new();
641 let mut manager = env
642 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
643 .await
644 .unwrap()
645 .unwrap();
646
647 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
648 new_metadata_builder.push_column_metadata(ColumnMetadata {
649 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
650 semantic_type: SemanticType::Field,
651 column_id: 252,
652 });
653 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
654
655 let action_list =
656 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
657 metadata: new_metadata.clone(),
658 }));
659
660 let current_version = manager.update(action_list).await.unwrap();
661 assert_eq!(current_version, 1);
662 manager.validate_manifest(&new_metadata, 1);
663
664 manager.stop().await;
666 let manager = env
667 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
668 .await
669 .unwrap()
670 .unwrap();
671 manager.validate_manifest(&new_metadata, 1);
672 }
673
674 async fn manifest_dir_usage(path: &str) -> u64 {
676 let mut size = 0;
677 let mut read_dir = tokio::fs::read_dir(path).await.unwrap();
678 while let Ok(dir_entry) = read_dir.next_entry().await {
679 let Some(entry) = dir_entry else {
680 break;
681 };
682 if entry.file_type().await.unwrap().is_file() {
683 let file_name = entry.file_name().into_string().unwrap();
684 if file_name.contains(".checkpoint") || file_name.contains(".json") {
685 let file_size = entry.metadata().await.unwrap().len() as usize;
686 debug!("File: {file_name:?}, size: {file_size}");
687 size += file_size;
688 }
689 }
690 }
691 size as u64
692 }
693
694 #[tokio::test]
695 async fn test_manifest_size() {
696 let metadata = Arc::new(basic_region_metadata());
697 let data_home = create_temp_dir("");
698 let data_home_path = data_home.path().to_str().unwrap().to_string();
699 let env = TestEnv::with_data_home(data_home);
700
701 let manifest_dir = format!("{}/manifest", data_home_path);
702
703 let mut manager = env
704 .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
705 .await
706 .unwrap()
707 .unwrap();
708
709 let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone());
710 new_metadata_builder.push_column_metadata(ColumnMetadata {
711 column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
712 semantic_type: SemanticType::Field,
713 column_id: 252,
714 });
715 let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
716
717 let action_list =
718 RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
719 metadata: new_metadata.clone(),
720 }));
721
722 let current_version = manager.update(action_list).await.unwrap();
723 assert_eq!(current_version, 1);
724 manager.validate_manifest(&new_metadata, 1);
725
726 let manifest_size = manager.manifest_usage();
728 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
729
730 for _ in 0..10 {
732 manager
733 .update(RegionMetaActionList::new(vec![RegionMetaAction::Edit(
734 RegionEdit {
735 files_to_add: vec![],
736 files_to_remove: vec![],
737 compaction_time_window: None,
738 flushed_entry_id: None,
739 flushed_sequence: None,
740 },
741 )]))
742 .await
743 .unwrap();
744 }
745
746 while manager.checkpointer.is_doing_checkpoint() {
747 tokio::time::sleep(Duration::from_millis(10)).await;
748 }
749
750 let manifest_size = manager.manifest_usage();
752 assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
753
754 manager.stop().await;
757 let manager = env
758 .create_manifest_manager(CompressionType::Uncompressed, 10, None)
759 .await
760 .unwrap()
761 .unwrap();
762 manager.validate_manifest(&new_metadata, 11);
763
764 let manifest_size = manager.manifest_usage();
766 assert_eq!(manifest_size, 1204);
767 }
768}