1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicI64, AtomicU64};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::{join_dir, normalize_dir};
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41
42use crate::access_layer::AccessLayer;
43use crate::cache::CacheManagerRef;
44use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
45use crate::config::MitoConfig;
46use crate::error;
47use crate::error::{
48 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
49 Result, StaleLogEntrySnafu,
50};
51use crate::manifest::action::RegionManifest;
52use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
53use crate::manifest::storage::manifest_compress_type;
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61 ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::RegionFileId;
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::time_provider::TimeProviderRef;
74use crate::wal::entry_reader::WalEntryReader;
75use crate::wal::{EntryId, Wal};
76
77#[async_trait::async_trait]
83pub trait PartitionExprFetcher {
84 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
85}
86
87pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
88
89pub(crate) struct RegionOpener {
91 region_id: RegionId,
92 metadata_builder: Option<RegionMetadataBuilder>,
93 memtable_builder_provider: MemtableBuilderProvider,
94 object_store_manager: ObjectStoreManagerRef,
95 table_dir: String,
96 path_type: PathType,
97 purge_scheduler: SchedulerRef,
98 options: Option<RegionOptions>,
99 cache_manager: Option<CacheManagerRef>,
100 skip_wal_replay: bool,
101 puffin_manager_factory: PuffinManagerFactory,
102 intermediate_manager: IntermediateManager,
103 time_provider: TimeProviderRef,
104 stats: ManifestStats,
105 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
106 replay_checkpoint: Option<u64>,
107 file_ref_manager: FileReferenceManagerRef,
108 partition_expr_fetcher: PartitionExprFetcherRef,
109}
110
111impl RegionOpener {
112 #[allow(clippy::too_many_arguments)]
115 pub(crate) fn new(
116 region_id: RegionId,
117 table_dir: &str,
118 path_type: PathType,
119 memtable_builder_provider: MemtableBuilderProvider,
120 object_store_manager: ObjectStoreManagerRef,
121 purge_scheduler: SchedulerRef,
122 puffin_manager_factory: PuffinManagerFactory,
123 intermediate_manager: IntermediateManager,
124 time_provider: TimeProviderRef,
125 file_ref_manager: FileReferenceManagerRef,
126 partition_expr_fetcher: PartitionExprFetcherRef,
127 ) -> RegionOpener {
128 RegionOpener {
129 region_id,
130 metadata_builder: None,
131 memtable_builder_provider,
132 object_store_manager,
133 table_dir: normalize_dir(table_dir),
134 path_type,
135 purge_scheduler,
136 options: None,
137 cache_manager: None,
138 skip_wal_replay: false,
139 puffin_manager_factory,
140 intermediate_manager,
141 time_provider,
142 stats: Default::default(),
143 wal_entry_reader: None,
144 replay_checkpoint: None,
145 file_ref_manager,
146 partition_expr_fetcher,
147 }
148 }
149
150 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
152 self.metadata_builder = Some(builder);
153 self
154 }
155
156 fn region_dir(&self) -> String {
158 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
159 }
160
161 fn build_metadata(&mut self) -> Result<RegionMetadata> {
167 let options = self.options.as_ref().unwrap();
168 let mut metadata_builder = self.metadata_builder.take().unwrap();
169 metadata_builder.primary_key_encoding(options.primary_key_encoding());
170 metadata_builder.build().context(InvalidMetadataSnafu)
171 }
172
173 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
175 self.options(RegionOptions::try_from(&options)?)
176 }
177
178 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
180 self.replay_checkpoint = replay_checkpoint;
181 self
182 }
183
184 pub(crate) fn wal_entry_reader(
187 mut self,
188 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
189 ) -> Self {
190 self.wal_entry_reader = wal_entry_reader;
191 self
192 }
193
194 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
196 options.validate()?;
197 self.options = Some(options);
198 Ok(self)
199 }
200
201 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
203 self.cache_manager = cache_manager;
204 self
205 }
206
207 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
209 self.skip_wal_replay = skip;
210 self
211 }
212
213 pub(crate) async fn create_or_open<S: LogStore>(
220 mut self,
221 config: &MitoConfig,
222 wal: &Wal<S>,
223 ) -> Result<MitoRegionRef> {
224 let region_id = self.region_id;
225 let region_dir = self.region_dir();
226 let metadata = self.build_metadata()?;
227 match self.maybe_open(config, wal).await {
229 Ok(Some(region)) => {
230 let recovered = region.metadata();
231 let expect = &metadata;
233 check_recovered_region(
234 &recovered,
235 expect.region_id,
236 &expect.column_metadatas,
237 &expect.primary_key,
238 )?;
239 region.set_role(RegionRole::Leader);
241
242 return Ok(region);
243 }
244 Ok(None) => {
245 debug!(
246 "No data under directory {}, region_id: {}",
247 region_dir, self.region_id
248 );
249 }
250 Err(e) => {
251 warn!(e;
252 "Failed to open region {} before creating it, region_dir: {}",
253 self.region_id, region_dir
254 );
255 }
256 }
257 let mut options = self.options.take().unwrap();
259 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
260 let provider = self.provider::<S>(&options.wal_options)?;
261 let metadata = Arc::new(metadata);
262 let sst_format = if let Some(format) = options.sst_format {
264 format
265 } else if config.default_experimental_flat_format {
266 options.sst_format = Some(FormatType::Flat);
267 FormatType::Flat
268 } else {
269 options.sst_format = Some(FormatType::PrimaryKey);
271 FormatType::PrimaryKey
272 };
273 let region_manifest_options =
275 Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
276 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
278 let manifest_manager = RegionManifestManager::new(
279 metadata.clone(),
280 flushed_entry_id,
281 region_manifest_options,
282 sst_format,
283 &self.stats,
284 )
285 .await?;
286
287 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
288 let part_duration = options.compaction.time_window();
289 let mutable = Arc::new(TimePartitions::new(
291 metadata.clone(),
292 memtable_builder.clone(),
293 0,
294 part_duration,
295 ));
296
297 debug!(
298 "Create region {} with options: {:?}, default_flat_format: {}",
299 region_id, options, config.default_experimental_flat_format
300 );
301
302 let version = VersionBuilder::new(metadata, mutable)
303 .options(options)
304 .build();
305 let version_control = Arc::new(VersionControl::new(version));
306 let access_layer = Arc::new(AccessLayer::new(
307 self.table_dir.clone(),
308 self.path_type,
309 object_store,
310 self.puffin_manager_factory,
311 self.intermediate_manager,
312 ));
313 let now = self.time_provider.current_time_millis();
314
315 Ok(Arc::new(MitoRegion {
316 region_id,
317 version_control,
318 access_layer: access_layer.clone(),
319 manifest_ctx: Arc::new(ManifestContext::new(
321 manifest_manager,
322 RegionRoleState::Leader(RegionLeaderState::Writable),
323 )),
324 file_purger: create_file_purger(
325 config.gc.enable,
326 self.purge_scheduler,
327 access_layer,
328 self.cache_manager,
329 self.file_ref_manager.clone(),
330 ),
331 provider,
332 last_flush_millis: AtomicI64::new(now),
333 last_compaction_millis: AtomicI64::new(now),
334 time_provider: self.time_provider.clone(),
335 topic_latest_entry_id: AtomicU64::new(0),
336 written_bytes: Arc::new(AtomicU64::new(0)),
337 stats: self.stats,
338 }))
339 }
340
341 pub(crate) async fn open<S: LogStore>(
345 mut self,
346 config: &MitoConfig,
347 wal: &Wal<S>,
348 ) -> Result<MitoRegionRef> {
349 let region_id = self.region_id;
350 let region_dir = self.region_dir();
351 let region = self
352 .maybe_open(config, wal)
353 .await?
354 .with_context(|| EmptyRegionDirSnafu {
355 region_id,
356 region_dir: ®ion_dir,
357 })?;
358
359 ensure!(
360 region.region_id == self.region_id,
361 RegionCorruptedSnafu {
362 region_id: self.region_id,
363 reason: format!(
364 "recovered region has different region id {}",
365 region.region_id
366 ),
367 }
368 );
369
370 Ok(region)
371 }
372
373 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
374 match wal_options {
375 WalOptions::RaftEngine => {
376 ensure!(
377 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
378 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
379 error::IncompatibleWalProviderChangeSnafu {
380 global: "`kafka`",
381 region: "`raft_engine`",
382 }
383 );
384 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
385 }
386 WalOptions::Kafka(options) => {
387 ensure!(
388 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
389 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
390 error::IncompatibleWalProviderChangeSnafu {
391 global: "`raft_engine`",
392 region: "`kafka`",
393 }
394 );
395 Ok(Provider::kafka_provider(options.topic.clone()))
396 }
397 WalOptions::Noop => Ok(Provider::noop_provider()),
398 }
399 }
400
401 async fn maybe_open<S: LogStore>(
403 &mut self,
404 config: &MitoConfig,
405 wal: &Wal<S>,
406 ) -> Result<Option<MitoRegionRef>> {
407 let now = Instant::now();
408 let mut region_options = self.options.as_ref().unwrap().clone();
409
410 let region_manifest_options = Self::manifest_options(
411 config,
412 ®ion_options,
413 &self.region_dir(),
414 &self.object_store_manager,
415 )?;
416 let Some(manifest_manager) =
417 RegionManifestManager::open(region_manifest_options, &self.stats).await?
418 else {
419 return Ok(None);
420 };
421
422 let manifest = manifest_manager.manifest();
424 let metadata = if manifest.metadata.partition_expr.is_none()
425 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
426 {
427 let metadata = manifest.metadata.as_ref().clone();
428 let mut builder = RegionMetadataBuilder::from_existing(metadata);
429 builder.partition_expr_json(Some(expr_json));
430 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
431 } else {
432 manifest.metadata.clone()
433 };
434 sanitize_region_options(&manifest, &mut region_options);
436
437 let region_id = self.region_id;
438 let provider = self.provider::<S>(®ion_options.wal_options)?;
439 let wal_entry_reader = self
440 .wal_entry_reader
441 .take()
442 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
443 let on_region_opened = wal.on_region_opened();
444 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
445
446 debug!(
447 "Open region {} at {} with options: {:?}",
448 region_id, self.table_dir, self.options
449 );
450
451 let access_layer = Arc::new(AccessLayer::new(
452 self.table_dir.clone(),
453 self.path_type,
454 object_store,
455 self.puffin_manager_factory.clone(),
456 self.intermediate_manager.clone(),
457 ));
458 let file_purger = create_file_purger(
459 config.gc.enable,
460 self.purge_scheduler.clone(),
461 access_layer.clone(),
462 self.cache_manager.clone(),
463 self.file_ref_manager.clone(),
464 );
465 let memtable_builder = self
467 .memtable_builder_provider
468 .builder_for_options(®ion_options);
469 let part_duration = region_options
472 .compaction
473 .time_window()
474 .or(manifest.compaction_time_window);
475 let mutable = Arc::new(TimePartitions::new(
477 metadata.clone(),
478 memtable_builder.clone(),
479 0,
480 part_duration,
481 ));
482
483 let version_builder = version_builder_from_manifest(
485 &manifest,
486 metadata,
487 file_purger.clone(),
488 mutable,
489 region_options,
490 );
491 let version = version_builder.build();
492 let flushed_entry_id = version.flushed_entry_id;
493 let version_control = Arc::new(VersionControl::new(version));
494
495 let topic_latest_entry_id = if !self.skip_wal_replay {
496 let replay_from_entry_id = self
497 .replay_checkpoint
498 .unwrap_or_default()
499 .max(flushed_entry_id);
500 info!(
501 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
502 replay_from_entry_id,
503 region_id,
504 manifest.manifest_version,
505 flushed_entry_id,
506 now.elapsed()
507 );
508 replay_memtable(
509 &provider,
510 wal_entry_reader,
511 region_id,
512 replay_from_entry_id,
513 &version_control,
514 config.allow_stale_entries,
515 on_region_opened,
516 )
517 .await?;
518 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
522 wal.store().latest_entry_id(&provider).unwrap_or(0)
523 } else {
524 0
525 }
526 } else {
527 info!(
528 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
529 region_id,
530 manifest.manifest_version,
531 flushed_entry_id,
532 now.elapsed()
533 );
534
535 0
536 };
537
538 if let Some(committed_in_manifest) = manifest.committed_sequence {
539 let committed_after_replay = version_control.committed_sequence();
540 if committed_in_manifest > committed_after_replay {
541 info!(
542 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
543 self.region_id,
544 version_control.current().version.flushed_sequence,
545 version_control.committed_sequence(),
546 committed_in_manifest
547 );
548 version_control.set_committed_sequence(committed_in_manifest);
549 }
550 }
551
552 let now = self.time_provider.current_time_millis();
553
554 let region = MitoRegion {
555 region_id: self.region_id,
556 version_control: version_control.clone(),
557 access_layer: access_layer.clone(),
558 manifest_ctx: Arc::new(ManifestContext::new(
560 manifest_manager,
561 RegionRoleState::Follower,
562 )),
563 file_purger,
564 provider: provider.clone(),
565 last_flush_millis: AtomicI64::new(now),
566 last_compaction_millis: AtomicI64::new(now),
567 time_provider: self.time_provider.clone(),
568 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
569 written_bytes: Arc::new(AtomicU64::new(0)),
570 stats: self.stats.clone(),
571 };
572
573 let region = Arc::new(region);
574
575 maybe_load_cache(®ion, config, &self.cache_manager);
576
577 Ok(Some(region))
578 }
579
580 fn manifest_options(
582 config: &MitoConfig,
583 options: &RegionOptions,
584 region_dir: &str,
585 object_store_manager: &ObjectStoreManagerRef,
586 ) -> Result<RegionManifestOptions> {
587 let object_store = get_object_store(&options.storage, object_store_manager)?;
588 Ok(RegionManifestOptions {
589 manifest_dir: new_manifest_dir(region_dir),
590 object_store,
591 compress_type: manifest_compress_type(config.compress_manifest),
594 checkpoint_distance: config.manifest_checkpoint_distance,
595 remove_file_options: RemoveFileOptions {
596 enable_gc: config.gc.enable,
597 },
598 })
599 }
600}
601
602pub(crate) fn version_builder_from_manifest(
604 manifest: &RegionManifest,
605 metadata: RegionMetadataRef,
606 file_purger: FilePurgerRef,
607 mutable: TimePartitionsRef,
608 region_options: RegionOptions,
609) -> VersionBuilder {
610 VersionBuilder::new(metadata, mutable)
611 .add_files(file_purger, manifest.files.values().cloned())
612 .flushed_entry_id(manifest.flushed_entry_id)
613 .flushed_sequence(manifest.flushed_sequence)
614 .truncated_entry_id(manifest.truncated_entry_id)
615 .compaction_time_window(manifest.compaction_time_window)
616 .options(region_options)
617}
618
619pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
621 let option_format = options.sst_format.unwrap_or_default();
622 if option_format != manifest.sst_format {
623 common_telemetry::warn!(
624 "Overriding SST format from {:?} to {:?} for region {}",
625 option_format,
626 manifest.sst_format,
627 manifest.metadata.region_id,
628 );
629 options.sst_format = Some(manifest.sst_format);
630 }
631}
632
633pub fn get_object_store(
635 name: &Option<String>,
636 object_store_manager: &ObjectStoreManagerRef,
637) -> Result<object_store::ObjectStore> {
638 if let Some(name) = name {
639 Ok(object_store_manager
640 .find(name)
641 .with_context(|| ObjectStoreNotFoundSnafu {
642 object_store: name.clone(),
643 })?
644 .clone())
645 } else {
646 Ok(object_store_manager.default_object_store().clone())
647 }
648}
649
650pub struct RegionMetadataLoader {
652 config: Arc<MitoConfig>,
653 object_store_manager: ObjectStoreManagerRef,
654}
655
656impl RegionMetadataLoader {
657 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
659 Self {
660 config,
661 object_store_manager,
662 }
663 }
664
665 pub async fn load(
667 &self,
668 region_dir: &str,
669 region_options: &RegionOptions,
670 ) -> Result<Option<RegionMetadataRef>> {
671 let manifest = self.load_manifest(region_dir, region_options).await?;
672 Ok(manifest.map(|m| m.metadata.clone()))
673 }
674
675 pub async fn load_manifest(
677 &self,
678 region_dir: &str,
679 region_options: &RegionOptions,
680 ) -> Result<Option<Arc<RegionManifest>>> {
681 let region_manifest_options = RegionOpener::manifest_options(
682 &self.config,
683 region_options,
684 region_dir,
685 &self.object_store_manager,
686 )?;
687 let Some(manifest_manager) =
688 RegionManifestManager::open(region_manifest_options, &Default::default()).await?
689 else {
690 return Ok(None);
691 };
692
693 let manifest = manifest_manager.manifest();
694 Ok(Some(manifest))
695 }
696}
697
698pub(crate) fn check_recovered_region(
700 recovered: &RegionMetadata,
701 region_id: RegionId,
702 column_metadatas: &[ColumnMetadata],
703 primary_key: &[ColumnId],
704) -> Result<()> {
705 if recovered.region_id != region_id {
706 error!(
707 "Recovered region {}, expect region {}",
708 recovered.region_id, region_id
709 );
710 return RegionCorruptedSnafu {
711 region_id,
712 reason: format!(
713 "recovered metadata has different region id {}",
714 recovered.region_id
715 ),
716 }
717 .fail();
718 }
719 if recovered.column_metadatas != column_metadatas {
720 error!(
721 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
722 recovered.region_id, recovered.column_metadatas, column_metadatas
723 );
724
725 return RegionCorruptedSnafu {
726 region_id,
727 reason: "recovered metadata has different schema",
728 }
729 .fail();
730 }
731 if recovered.primary_key != primary_key {
732 error!(
733 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
734 recovered.region_id, recovered.primary_key, primary_key
735 );
736
737 return RegionCorruptedSnafu {
738 region_id,
739 reason: "recovered metadata has different primary key",
740 }
741 .fail();
742 }
743
744 Ok(())
745}
746
747pub(crate) async fn replay_memtable<F>(
749 provider: &Provider,
750 mut wal_entry_reader: Box<dyn WalEntryReader>,
751 region_id: RegionId,
752 flushed_entry_id: EntryId,
753 version_control: &VersionControlRef,
754 allow_stale_entries: bool,
755 on_region_opened: F,
756) -> Result<EntryId>
757where
758 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
759{
760 let now = Instant::now();
761 let mut rows_replayed = 0;
762 let mut last_entry_id = flushed_entry_id;
765 let replay_from_entry_id = flushed_entry_id + 1;
766
767 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
768 while let Some(res) = wal_stream.next().await {
769 let (entry_id, entry) = res?;
770 if entry_id <= flushed_entry_id {
771 warn!(
772 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
773 region_id, flushed_entry_id, entry_id
774 );
775 ensure!(
776 allow_stale_entries,
777 StaleLogEntrySnafu {
778 region_id,
779 flushed_entry_id,
780 unexpected_entry_id: entry_id,
781 }
782 );
783 }
784 last_entry_id = last_entry_id.max(entry_id);
785
786 let mut region_write_ctx = RegionWriteCtx::new(
787 region_id,
788 version_control,
789 provider.clone(),
790 None,
792 );
793 for mutation in entry.mutations {
794 rows_replayed += mutation
795 .rows
796 .as_ref()
797 .map(|rows| rows.rows.len())
798 .unwrap_or(0);
799 region_write_ctx.push_mutation(
800 mutation.op_type,
801 mutation.rows,
802 mutation.write_hint,
803 OptionOutputTx::none(),
804 Some(mutation.sequence),
806 );
807 }
808
809 for bulk_entry in entry.bulk_entries {
810 let part = BulkPart::try_from(bulk_entry)?;
811 rows_replayed += part.num_rows();
812 let bulk_sequence_from_wal = part.sequence;
814 ensure!(
815 region_write_ctx.push_bulk(
816 OptionOutputTx::none(),
817 part,
818 Some(bulk_sequence_from_wal)
819 ),
820 RegionCorruptedSnafu {
821 region_id,
822 reason: "unable to replay memtable with bulk entries",
823 }
824 );
825 }
826
827 region_write_ctx.set_next_entry_id(last_entry_id + 1);
829 region_write_ctx.write_memtable().await;
830 region_write_ctx.write_bulk().await;
831 }
832
833 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
836
837 let series_count = version_control.current().series_count();
838 info!(
839 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
840 region_id,
841 provider,
842 rows_replayed,
843 replay_from_entry_id,
844 last_entry_id,
845 series_count,
846 now.elapsed()
847 );
848 Ok(last_entry_id)
849}
850
851pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
853 join_dir(region_dir, "manifest")
854}
855
856pub(crate) struct RegionLoadCacheTask {
858 region: MitoRegionRef,
859}
860
861impl RegionLoadCacheTask {
862 pub(crate) fn new(region: MitoRegionRef) -> Self {
863 Self { region }
864 }
865
866 pub(crate) async fn fill_cache(&self, file_cache: FileCacheRef) {
868 let region_id = self.region.region_id;
869 let table_dir = self.region.access_layer.table_dir();
870 let path_type = self.region.access_layer.path_type();
871 let object_store = self.region.access_layer.object_store();
872 let version_control = &self.region.version_control;
873
874 let mut files_to_download = Vec::new();
876 let mut files_already_cached = 0;
877
878 {
879 let version = version_control.current().version;
880 for level in version.ssts.levels() {
881 for file_handle in level.files.values() {
882 let file_meta = file_handle.meta_ref();
883 if file_meta.exists_index() {
884 let puffin_key = IndexKey::new(
885 file_meta.region_id,
886 file_meta.index_file_id().file_id(),
887 FileType::Puffin,
888 );
889
890 if !file_cache.contains_key(&puffin_key) {
891 files_to_download.push((
892 puffin_key,
893 file_meta.index_file_size,
894 file_meta.time_range.1, ));
896 } else {
897 files_already_cached += 1;
898 }
899 }
900 }
901 }
902 }
905
906 files_to_download.sort_by(|a, b| b.2.cmp(&a.2));
908
909 let total_files = files_to_download.len() as i64;
910
911 info!(
912 "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
913 region_id, total_files, files_already_cached
914 );
915
916 CACHE_FILL_PENDING_FILES.add(total_files);
917
918 let mut files_downloaded = 0;
919 let mut files_skipped = 0;
920
921 for (puffin_key, file_size, max_timestamp) in files_to_download {
922 let current_size = file_cache.puffin_cache_size();
923 let capacity = file_cache.puffin_cache_capacity();
924 let region_state = self.region.state();
925 if !can_load_cache(region_state) {
926 info!(
927 "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
928 region_state, region_id, current_size, capacity
929 );
930 break;
931 }
932
933 if current_size + file_size > capacity {
935 info!(
936 "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
937 region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
938 );
939 files_skipped = (total_files - files_downloaded) as usize;
940 CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
941 break;
942 }
943
944 let index_remote_path = location::index_file_path(
945 table_dir,
946 RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
947 path_type,
948 );
949
950 match file_cache
951 .download(puffin_key, &index_remote_path, object_store, file_size)
952 .await
953 {
954 Ok(_) => {
955 debug!(
956 "Downloaded index file to write cache, region: {}, file_id: {}",
957 region_id, puffin_key.file_id
958 );
959 files_downloaded += 1;
960 CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
961 CACHE_FILL_PENDING_FILES.dec();
962 }
963 Err(e) => {
964 warn!(
965 e; "Failed to download index file to write cache, region: {}, file_id: {}",
966 region_id, puffin_key.file_id
967 );
968 CACHE_FILL_PENDING_FILES.dec();
969 }
970 }
971 }
972
973 info!(
974 "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
975 region_id, total_files, files_downloaded, files_already_cached, files_skipped
976 );
977 }
978}
979
980fn maybe_load_cache(
982 region: &MitoRegionRef,
983 config: &MitoConfig,
984 cache_manager: &Option<CacheManagerRef>,
985) {
986 let Some(cache_manager) = cache_manager else {
987 return;
988 };
989 let Some(write_cache) = cache_manager.write_cache() else {
990 return;
991 };
992
993 let preload_enabled = config.preload_index_cache;
994 if !preload_enabled {
995 return;
996 }
997
998 let task = RegionLoadCacheTask::new(region.clone());
999 write_cache.load_region_cache(task);
1000}
1001
1002fn can_load_cache(state: RegionRoleState) -> bool {
1003 match state {
1004 RegionRoleState::Leader(RegionLeaderState::Writable)
1005 | RegionRoleState::Leader(RegionLeaderState::Staging)
1006 | RegionRoleState::Leader(RegionLeaderState::Altering)
1007 | RegionRoleState::Leader(RegionLeaderState::Editing)
1008 | RegionRoleState::Follower => true,
1009 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1011 | RegionRoleState::Leader(RegionLeaderState::Dropping)
1012 | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1013 }
1014}