1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, Mutex};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41
42use crate::access_layer::AccessLayer;
43use crate::cache::CacheManagerRef;
44use crate::cache::file_cache::{FileCache, FileType, IndexKey};
45use crate::config::MitoConfig;
46use crate::error;
47use crate::error::{
48 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
49 Result, StaleLogEntrySnafu,
50};
51use crate::manifest::action::RegionManifest;
52use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
53use crate::memtable::MemtableBuilderProvider;
54use crate::memtable::bulk::part::BulkPart;
55use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
56use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
57use crate::region::options::RegionOptions;
58use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
59use crate::region::{
60 ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
61};
62use crate::region_write_ctx::RegionWriteCtx;
63use crate::request::OptionOutputTx;
64use crate::schedule::scheduler::SchedulerRef;
65use crate::sst::FormatType;
66use crate::sst::file::{RegionFileId, RegionIndexId};
67use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
68use crate::sst::file_ref::FileReferenceManagerRef;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::puffin_manager::PuffinManagerFactory;
71use crate::sst::location::{self, region_dir_from_table_dir};
72use crate::time_provider::TimeProviderRef;
73use crate::wal::entry_reader::WalEntryReader;
74use crate::wal::{EntryId, Wal};
75
76#[async_trait::async_trait]
82pub trait PartitionExprFetcher {
83 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
84}
85
86pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
87
88pub(crate) struct RegionOpener {
90 region_id: RegionId,
91 metadata_builder: Option<RegionMetadataBuilder>,
92 memtable_builder_provider: MemtableBuilderProvider,
93 object_store_manager: ObjectStoreManagerRef,
94 table_dir: String,
95 path_type: PathType,
96 purge_scheduler: SchedulerRef,
97 options: Option<RegionOptions>,
98 cache_manager: Option<CacheManagerRef>,
99 skip_wal_replay: bool,
100 puffin_manager_factory: PuffinManagerFactory,
101 intermediate_manager: IntermediateManager,
102 time_provider: TimeProviderRef,
103 stats: ManifestStats,
104 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
105 replay_checkpoint: Option<u64>,
106 file_ref_manager: FileReferenceManagerRef,
107 partition_expr_fetcher: PartitionExprFetcherRef,
108}
109
110impl RegionOpener {
111 #[allow(clippy::too_many_arguments)]
114 pub(crate) fn new(
115 region_id: RegionId,
116 table_dir: &str,
117 path_type: PathType,
118 memtable_builder_provider: MemtableBuilderProvider,
119 object_store_manager: ObjectStoreManagerRef,
120 purge_scheduler: SchedulerRef,
121 puffin_manager_factory: PuffinManagerFactory,
122 intermediate_manager: IntermediateManager,
123 time_provider: TimeProviderRef,
124 file_ref_manager: FileReferenceManagerRef,
125 partition_expr_fetcher: PartitionExprFetcherRef,
126 ) -> RegionOpener {
127 RegionOpener {
128 region_id,
129 metadata_builder: None,
130 memtable_builder_provider,
131 object_store_manager,
132 table_dir: normalize_dir(table_dir),
133 path_type,
134 purge_scheduler,
135 options: None,
136 cache_manager: None,
137 skip_wal_replay: false,
138 puffin_manager_factory,
139 intermediate_manager,
140 time_provider,
141 stats: Default::default(),
142 wal_entry_reader: None,
143 replay_checkpoint: None,
144 file_ref_manager,
145 partition_expr_fetcher,
146 }
147 }
148
149 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
151 self.metadata_builder = Some(builder);
152 self
153 }
154
155 fn region_dir(&self) -> String {
157 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
158 }
159
160 fn build_metadata(&mut self) -> Result<RegionMetadata> {
166 let options = self.options.as_ref().unwrap();
167 let mut metadata_builder = self.metadata_builder.take().unwrap();
168 metadata_builder.primary_key_encoding(options.primary_key_encoding());
169 metadata_builder.build().context(InvalidMetadataSnafu)
170 }
171
172 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
174 self.options(RegionOptions::try_from(&options)?)
175 }
176
177 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
179 self.replay_checkpoint = replay_checkpoint;
180 self
181 }
182
183 pub(crate) fn wal_entry_reader(
186 mut self,
187 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
188 ) -> Self {
189 self.wal_entry_reader = wal_entry_reader;
190 self
191 }
192
193 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
195 options.validate()?;
196 self.options = Some(options);
197 Ok(self)
198 }
199
200 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
202 self.cache_manager = cache_manager;
203 self
204 }
205
206 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
208 self.skip_wal_replay = skip;
209 self
210 }
211
212 pub(crate) async fn create_or_open<S: LogStore>(
219 mut self,
220 config: &MitoConfig,
221 wal: &Wal<S>,
222 ) -> Result<MitoRegionRef> {
223 let region_id = self.region_id;
224 let region_dir = self.region_dir();
225 let metadata = self.build_metadata()?;
226 match self.maybe_open(config, wal).await {
228 Ok(Some(region)) => {
229 let recovered = region.metadata();
230 let expect = &metadata;
232 check_recovered_region(
233 &recovered,
234 expect.region_id,
235 &expect.column_metadatas,
236 &expect.primary_key,
237 )?;
238 region.set_role(RegionRole::Leader);
240
241 return Ok(region);
242 }
243 Ok(None) => {
244 debug!(
245 "No data under directory {}, region_id: {}",
246 region_dir, self.region_id
247 );
248 }
249 Err(e) => {
250 warn!(e;
251 "Failed to open region {} before creating it, region_dir: {}",
252 self.region_id, region_dir
253 );
254 }
255 }
256 let mut options = self.options.take().unwrap();
258 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
259 let provider = self.provider::<S>(&options.wal_options)?;
260 let metadata = Arc::new(metadata);
261 let sst_format = if let Some(format) = options.sst_format {
263 format
264 } else if config.default_experimental_flat_format {
265 options.sst_format = Some(FormatType::Flat);
266 FormatType::Flat
267 } else {
268 options.sst_format = Some(FormatType::PrimaryKey);
270 FormatType::PrimaryKey
271 };
272 let mut region_manifest_options =
274 RegionManifestOptions::new(config, ®ion_dir, &object_store);
275 region_manifest_options.manifest_cache = self
277 .cache_manager
278 .as_ref()
279 .and_then(|cm| cm.write_cache())
280 .and_then(|wc| wc.manifest_cache());
281 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
283 let manifest_manager = RegionManifestManager::new(
284 metadata.clone(),
285 flushed_entry_id,
286 region_manifest_options,
287 sst_format,
288 &self.stats,
289 )
290 .await?;
291
292 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
293 let part_duration = options.compaction.time_window();
294 let mutable = Arc::new(TimePartitions::new(
296 metadata.clone(),
297 memtable_builder.clone(),
298 0,
299 part_duration,
300 ));
301
302 debug!(
303 "Create region {} with options: {:?}, default_flat_format: {}",
304 region_id, options, config.default_experimental_flat_format
305 );
306
307 let version = VersionBuilder::new(metadata, mutable)
308 .options(options)
309 .build();
310 let version_control = Arc::new(VersionControl::new(version));
311 let access_layer = Arc::new(AccessLayer::new(
312 self.table_dir.clone(),
313 self.path_type,
314 object_store,
315 self.puffin_manager_factory,
316 self.intermediate_manager,
317 ));
318 let now = self.time_provider.current_time_millis();
319
320 Ok(Arc::new(MitoRegion {
321 region_id,
322 version_control,
323 access_layer: access_layer.clone(),
324 manifest_ctx: Arc::new(ManifestContext::new(
326 manifest_manager,
327 RegionRoleState::Leader(RegionLeaderState::Writable),
328 )),
329 file_purger: create_file_purger(
330 config.gc.enable,
331 self.purge_scheduler,
332 access_layer,
333 self.cache_manager,
334 self.file_ref_manager.clone(),
335 ),
336 provider,
337 last_flush_millis: AtomicI64::new(now),
338 last_compaction_millis: AtomicI64::new(now),
339 time_provider: self.time_provider.clone(),
340 topic_latest_entry_id: AtomicU64::new(0),
341 written_bytes: Arc::new(AtomicU64::new(0)),
342 stats: self.stats,
343 staging_partition_expr: Mutex::new(None),
344 }))
345 }
346
347 pub(crate) async fn open<S: LogStore>(
351 mut self,
352 config: &MitoConfig,
353 wal: &Wal<S>,
354 ) -> Result<MitoRegionRef> {
355 let region_id = self.region_id;
356 let region_dir = self.region_dir();
357 let region = self
358 .maybe_open(config, wal)
359 .await?
360 .with_context(|| EmptyRegionDirSnafu {
361 region_id,
362 region_dir: ®ion_dir,
363 })?;
364
365 ensure!(
366 region.region_id == self.region_id,
367 RegionCorruptedSnafu {
368 region_id: self.region_id,
369 reason: format!(
370 "recovered region has different region id {}",
371 region.region_id
372 ),
373 }
374 );
375
376 Ok(region)
377 }
378
379 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
380 match wal_options {
381 WalOptions::RaftEngine => {
382 ensure!(
383 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
384 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
385 error::IncompatibleWalProviderChangeSnafu {
386 global: "`kafka`",
387 region: "`raft_engine`",
388 }
389 );
390 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
391 }
392 WalOptions::Kafka(options) => {
393 ensure!(
394 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
395 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
396 error::IncompatibleWalProviderChangeSnafu {
397 global: "`raft_engine`",
398 region: "`kafka`",
399 }
400 );
401 Ok(Provider::kafka_provider(options.topic.clone()))
402 }
403 WalOptions::Noop => Ok(Provider::noop_provider()),
404 }
405 }
406
407 async fn maybe_open<S: LogStore>(
409 &mut self,
410 config: &MitoConfig,
411 wal: &Wal<S>,
412 ) -> Result<Option<MitoRegionRef>> {
413 let now = Instant::now();
414 let mut region_options = self.options.as_ref().unwrap().clone();
415 let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?;
416 let mut region_manifest_options =
417 RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
418 region_manifest_options.manifest_cache = self
420 .cache_manager
421 .as_ref()
422 .and_then(|cm| cm.write_cache())
423 .and_then(|wc| wc.manifest_cache());
424 let Some(manifest_manager) =
425 RegionManifestManager::open(region_manifest_options, &self.stats).await?
426 else {
427 return Ok(None);
428 };
429
430 let manifest = manifest_manager.manifest();
432 let metadata = if manifest.metadata.partition_expr.is_none()
433 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
434 {
435 let metadata = manifest.metadata.as_ref().clone();
436 let mut builder = RegionMetadataBuilder::from_existing(metadata);
437 builder.partition_expr_json(Some(expr_json));
438 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
439 } else {
440 manifest.metadata.clone()
441 };
442 sanitize_region_options(&manifest, &mut region_options);
444
445 let region_id = self.region_id;
446 let provider = self.provider::<S>(®ion_options.wal_options)?;
447 let wal_entry_reader = self
448 .wal_entry_reader
449 .take()
450 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
451 let on_region_opened = wal.on_region_opened();
452 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
453
454 debug!(
455 "Open region {} at {} with options: {:?}",
456 region_id, self.table_dir, self.options
457 );
458
459 let access_layer = Arc::new(AccessLayer::new(
460 self.table_dir.clone(),
461 self.path_type,
462 object_store,
463 self.puffin_manager_factory.clone(),
464 self.intermediate_manager.clone(),
465 ));
466 let file_purger = create_file_purger(
467 config.gc.enable,
468 self.purge_scheduler.clone(),
469 access_layer.clone(),
470 self.cache_manager.clone(),
471 self.file_ref_manager.clone(),
472 );
473 let memtable_builder = self
475 .memtable_builder_provider
476 .builder_for_options(®ion_options);
477 let part_duration = region_options
480 .compaction
481 .time_window()
482 .or(manifest.compaction_time_window);
483 let mutable = Arc::new(TimePartitions::new(
485 metadata.clone(),
486 memtable_builder.clone(),
487 0,
488 part_duration,
489 ));
490
491 let version_builder = version_builder_from_manifest(
493 &manifest,
494 metadata,
495 file_purger.clone(),
496 mutable,
497 region_options,
498 );
499 let version = version_builder.build();
500 let flushed_entry_id = version.flushed_entry_id;
501 let version_control = Arc::new(VersionControl::new(version));
502
503 let topic_latest_entry_id = if !self.skip_wal_replay {
504 let replay_from_entry_id = self
505 .replay_checkpoint
506 .unwrap_or_default()
507 .max(flushed_entry_id);
508 info!(
509 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
510 replay_from_entry_id,
511 region_id,
512 manifest.manifest_version,
513 flushed_entry_id,
514 now.elapsed()
515 );
516 replay_memtable(
517 &provider,
518 wal_entry_reader,
519 region_id,
520 replay_from_entry_id,
521 &version_control,
522 config.allow_stale_entries,
523 on_region_opened,
524 )
525 .await?;
526 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
530 wal.store().latest_entry_id(&provider).unwrap_or(0)
531 } else {
532 0
533 }
534 } else {
535 info!(
536 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
537 region_id,
538 manifest.manifest_version,
539 flushed_entry_id,
540 now.elapsed()
541 );
542
543 0
544 };
545
546 if let Some(committed_in_manifest) = manifest.committed_sequence {
547 let committed_after_replay = version_control.committed_sequence();
548 if committed_in_manifest > committed_after_replay {
549 info!(
550 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
551 self.region_id,
552 version_control.current().version.flushed_sequence,
553 version_control.committed_sequence(),
554 committed_in_manifest
555 );
556 version_control.set_committed_sequence(committed_in_manifest);
557 }
558 }
559
560 let now = self.time_provider.current_time_millis();
561
562 let region = MitoRegion {
563 region_id: self.region_id,
564 version_control: version_control.clone(),
565 access_layer: access_layer.clone(),
566 manifest_ctx: Arc::new(ManifestContext::new(
568 manifest_manager,
569 RegionRoleState::Follower,
570 )),
571 file_purger,
572 provider: provider.clone(),
573 last_flush_millis: AtomicI64::new(now),
574 last_compaction_millis: AtomicI64::new(now),
575 time_provider: self.time_provider.clone(),
576 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
577 written_bytes: Arc::new(AtomicU64::new(0)),
578 stats: self.stats.clone(),
579 staging_partition_expr: Mutex::new(None),
581 };
582
583 let region = Arc::new(region);
584
585 maybe_load_cache(®ion, config, &self.cache_manager);
586
587 Ok(Some(region))
588 }
589}
590
591pub(crate) fn version_builder_from_manifest(
593 manifest: &RegionManifest,
594 metadata: RegionMetadataRef,
595 file_purger: FilePurgerRef,
596 mutable: TimePartitionsRef,
597 region_options: RegionOptions,
598) -> VersionBuilder {
599 VersionBuilder::new(metadata, mutable)
600 .add_files(file_purger, manifest.files.values().cloned())
601 .flushed_entry_id(manifest.flushed_entry_id)
602 .flushed_sequence(manifest.flushed_sequence)
603 .truncated_entry_id(manifest.truncated_entry_id)
604 .compaction_time_window(manifest.compaction_time_window)
605 .options(region_options)
606}
607
608pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
610 let option_format = options.sst_format.unwrap_or_default();
611 if option_format != manifest.sst_format {
612 common_telemetry::warn!(
613 "Overriding SST format from {:?} to {:?} for region {}",
614 option_format,
615 manifest.sst_format,
616 manifest.metadata.region_id,
617 );
618 options.sst_format = Some(manifest.sst_format);
619 }
620}
621
622pub fn get_object_store(
624 name: &Option<String>,
625 object_store_manager: &ObjectStoreManagerRef,
626) -> Result<object_store::ObjectStore> {
627 if let Some(name) = name {
628 Ok(object_store_manager
629 .find(name)
630 .with_context(|| ObjectStoreNotFoundSnafu {
631 object_store: name.clone(),
632 })?
633 .clone())
634 } else {
635 Ok(object_store_manager.default_object_store().clone())
636 }
637}
638
639#[derive(Debug, Clone)]
641pub struct RegionMetadataLoader {
642 config: Arc<MitoConfig>,
643 object_store_manager: ObjectStoreManagerRef,
644}
645
646impl RegionMetadataLoader {
647 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
649 Self {
650 config,
651 object_store_manager,
652 }
653 }
654
655 pub async fn load(
657 &self,
658 region_dir: &str,
659 region_options: &RegionOptions,
660 ) -> Result<Option<RegionMetadataRef>> {
661 let manifest = self
662 .load_manifest(region_dir, ®ion_options.storage)
663 .await?;
664 Ok(manifest.map(|m| m.metadata.clone()))
665 }
666
667 pub async fn load_manifest(
669 &self,
670 region_dir: &str,
671 storage: &Option<String>,
672 ) -> Result<Option<Arc<RegionManifest>>> {
673 let object_store = get_object_store(storage, &self.object_store_manager)?;
674 let region_manifest_options =
675 RegionManifestOptions::new(&self.config, region_dir, &object_store);
676 let Some(manifest_manager) =
677 RegionManifestManager::open(region_manifest_options, &Default::default()).await?
678 else {
679 return Ok(None);
680 };
681
682 let manifest = manifest_manager.manifest();
683 Ok(Some(manifest))
684 }
685}
686
687pub(crate) fn check_recovered_region(
689 recovered: &RegionMetadata,
690 region_id: RegionId,
691 column_metadatas: &[ColumnMetadata],
692 primary_key: &[ColumnId],
693) -> Result<()> {
694 if recovered.region_id != region_id {
695 error!(
696 "Recovered region {}, expect region {}",
697 recovered.region_id, region_id
698 );
699 return RegionCorruptedSnafu {
700 region_id,
701 reason: format!(
702 "recovered metadata has different region id {}",
703 recovered.region_id
704 ),
705 }
706 .fail();
707 }
708 if recovered.column_metadatas != column_metadatas {
709 error!(
710 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
711 recovered.region_id, recovered.column_metadatas, column_metadatas
712 );
713
714 return RegionCorruptedSnafu {
715 region_id,
716 reason: "recovered metadata has different schema",
717 }
718 .fail();
719 }
720 if recovered.primary_key != primary_key {
721 error!(
722 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
723 recovered.region_id, recovered.primary_key, primary_key
724 );
725
726 return RegionCorruptedSnafu {
727 region_id,
728 reason: "recovered metadata has different primary key",
729 }
730 .fail();
731 }
732
733 Ok(())
734}
735
736pub(crate) async fn replay_memtable<F>(
738 provider: &Provider,
739 mut wal_entry_reader: Box<dyn WalEntryReader>,
740 region_id: RegionId,
741 flushed_entry_id: EntryId,
742 version_control: &VersionControlRef,
743 allow_stale_entries: bool,
744 on_region_opened: F,
745) -> Result<EntryId>
746where
747 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
748{
749 let now = Instant::now();
750 let mut rows_replayed = 0;
751 let mut last_entry_id = flushed_entry_id;
754 let replay_from_entry_id = flushed_entry_id + 1;
755
756 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
757 while let Some(res) = wal_stream.next().await {
758 let (entry_id, entry) = res?;
759 if entry_id <= flushed_entry_id {
760 warn!(
761 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
762 region_id, flushed_entry_id, entry_id
763 );
764 ensure!(
765 allow_stale_entries,
766 StaleLogEntrySnafu {
767 region_id,
768 flushed_entry_id,
769 unexpected_entry_id: entry_id,
770 }
771 );
772 }
773 last_entry_id = last_entry_id.max(entry_id);
774
775 let mut region_write_ctx = RegionWriteCtx::new(
776 region_id,
777 version_control,
778 provider.clone(),
779 None,
781 );
782 for mutation in entry.mutations {
783 rows_replayed += mutation
784 .rows
785 .as_ref()
786 .map(|rows| rows.rows.len())
787 .unwrap_or(0);
788 region_write_ctx.push_mutation(
789 mutation.op_type,
790 mutation.rows,
791 mutation.write_hint,
792 OptionOutputTx::none(),
793 Some(mutation.sequence),
795 );
796 }
797
798 for bulk_entry in entry.bulk_entries {
799 let part = BulkPart::try_from(bulk_entry)?;
800 rows_replayed += part.num_rows();
801 let bulk_sequence_from_wal = part.sequence;
803 ensure!(
804 region_write_ctx.push_bulk(
805 OptionOutputTx::none(),
806 part,
807 Some(bulk_sequence_from_wal)
808 ),
809 RegionCorruptedSnafu {
810 region_id,
811 reason: "unable to replay memtable with bulk entries",
812 }
813 );
814 }
815
816 region_write_ctx.set_next_entry_id(last_entry_id + 1);
818 region_write_ctx.write_memtable().await;
819 region_write_ctx.write_bulk().await;
820 }
821
822 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
825
826 let series_count = version_control.current().series_count();
827 info!(
828 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
829 region_id,
830 provider,
831 rows_replayed,
832 replay_from_entry_id,
833 last_entry_id,
834 series_count,
835 now.elapsed()
836 );
837 Ok(last_entry_id)
838}
839
840pub(crate) struct RegionLoadCacheTask {
842 region: MitoRegionRef,
843}
844
845impl RegionLoadCacheTask {
846 pub(crate) fn new(region: MitoRegionRef) -> Self {
847 Self { region }
848 }
849
850 pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
852 let region_id = self.region.region_id;
853 let table_dir = self.region.access_layer.table_dir();
854 let path_type = self.region.access_layer.path_type();
855 let object_store = self.region.access_layer.object_store();
856 let version_control = &self.region.version_control;
857
858 let mut files_to_download = Vec::new();
860 let mut files_already_cached = 0;
861
862 {
863 let version = version_control.current().version;
864 for level in version.ssts.levels() {
865 for file_handle in level.files.values() {
866 let file_meta = file_handle.meta_ref();
867 if file_meta.exists_index() {
868 let puffin_key = IndexKey::new(
869 file_meta.region_id,
870 file_meta.file_id,
871 FileType::Puffin(file_meta.index_version),
872 );
873
874 if !file_cache.contains_key(&puffin_key) {
875 files_to_download.push((
876 puffin_key,
877 file_meta.index_file_size,
878 file_meta.time_range.1, ));
880 } else {
881 files_already_cached += 1;
882 }
883 }
884 }
885 }
886 }
889
890 files_to_download.sort_by(|a, b| b.2.cmp(&a.2));
892
893 let total_files = files_to_download.len() as i64;
894
895 info!(
896 "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
897 region_id, total_files, files_already_cached
898 );
899
900 CACHE_FILL_PENDING_FILES.add(total_files);
901
902 let mut files_downloaded = 0;
903 let mut files_skipped = 0;
904
905 for (puffin_key, file_size, max_timestamp) in files_to_download {
906 let current_size = file_cache.puffin_cache_size();
907 let capacity = file_cache.puffin_cache_capacity();
908 let region_state = self.region.state();
909 if !can_load_cache(region_state) {
910 info!(
911 "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
912 region_state, region_id, current_size, capacity
913 );
914 break;
915 }
916
917 if current_size + file_size > capacity {
919 info!(
920 "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
921 region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
922 );
923 files_skipped = (total_files - files_downloaded) as usize;
924 CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
925 break;
926 }
927
928 let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
929 version
930 } else {
931 unreachable!("`files_to_download` should only contains Puffin files");
932 };
933 let index_id = RegionIndexId::new(
934 RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
935 index_version,
936 );
937
938 let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
939
940 match file_cache
941 .download(puffin_key, &index_remote_path, object_store, file_size)
942 .await
943 {
944 Ok(_) => {
945 debug!(
946 "Downloaded index file to write cache, region: {}, file_id: {}",
947 region_id, puffin_key.file_id
948 );
949 files_downloaded += 1;
950 CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
951 CACHE_FILL_PENDING_FILES.dec();
952 }
953 Err(e) => {
954 warn!(
955 e; "Failed to download index file to write cache, region: {}, file_id: {}",
956 region_id, puffin_key.file_id
957 );
958 CACHE_FILL_PENDING_FILES.dec();
959 }
960 }
961 }
962
963 info!(
964 "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
965 region_id, total_files, files_downloaded, files_already_cached, files_skipped
966 );
967 }
968}
969
970fn maybe_load_cache(
972 region: &MitoRegionRef,
973 config: &MitoConfig,
974 cache_manager: &Option<CacheManagerRef>,
975) {
976 let Some(cache_manager) = cache_manager else {
977 return;
978 };
979 let Some(write_cache) = cache_manager.write_cache() else {
980 return;
981 };
982
983 let preload_enabled = config.preload_index_cache;
984 if !preload_enabled {
985 return;
986 }
987
988 let task = RegionLoadCacheTask::new(region.clone());
989 write_cache.load_region_cache(task);
990}
991
992fn can_load_cache(state: RegionRoleState) -> bool {
993 match state {
994 RegionRoleState::Leader(RegionLeaderState::Writable)
995 | RegionRoleState::Leader(RegionLeaderState::Staging)
996 | RegionRoleState::Leader(RegionLeaderState::Altering)
997 | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
998 | RegionRoleState::Leader(RegionLeaderState::Editing)
999 | RegionRoleState::Follower => true,
1000 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1002 | RegionRoleState::Leader(RegionLeaderState::Dropping)
1003 | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1004 }
1005}