1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock, Mutex};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41use tokio::sync::Semaphore;
42
43use crate::access_layer::AccessLayer;
44use crate::cache::CacheManagerRef;
45use crate::cache::file_cache::{FileCache, FileType, IndexKey};
46use crate::config::MitoConfig;
47use crate::error;
48use crate::error::{
49 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
50 Result, StaleLogEntrySnafu,
51};
52use crate::manifest::action::RegionManifest;
53use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61 ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::sst::parquet::metadata::MetadataLoader;
74use crate::sst::parquet::reader::MetadataCacheMetrics;
75use crate::time_provider::TimeProviderRef;
76use crate::wal::entry_reader::WalEntryReader;
77use crate::wal::{EntryId, Wal};
78
79const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
80
81static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
82 LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
83
84#[async_trait::async_trait]
90pub trait PartitionExprFetcher {
91 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
92}
93
94pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
95
96pub(crate) struct RegionOpener {
98 region_id: RegionId,
99 metadata_builder: Option<RegionMetadataBuilder>,
100 memtable_builder_provider: MemtableBuilderProvider,
101 object_store_manager: ObjectStoreManagerRef,
102 table_dir: String,
103 path_type: PathType,
104 purge_scheduler: SchedulerRef,
105 options: Option<RegionOptions>,
106 cache_manager: Option<CacheManagerRef>,
107 skip_wal_replay: bool,
108 puffin_manager_factory: PuffinManagerFactory,
109 intermediate_manager: IntermediateManager,
110 time_provider: TimeProviderRef,
111 stats: ManifestStats,
112 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
113 replay_checkpoint: Option<u64>,
114 file_ref_manager: FileReferenceManagerRef,
115 partition_expr_fetcher: PartitionExprFetcherRef,
116}
117
118impl RegionOpener {
119 #[allow(clippy::too_many_arguments)]
122 pub(crate) fn new(
123 region_id: RegionId,
124 table_dir: &str,
125 path_type: PathType,
126 memtable_builder_provider: MemtableBuilderProvider,
127 object_store_manager: ObjectStoreManagerRef,
128 purge_scheduler: SchedulerRef,
129 puffin_manager_factory: PuffinManagerFactory,
130 intermediate_manager: IntermediateManager,
131 time_provider: TimeProviderRef,
132 file_ref_manager: FileReferenceManagerRef,
133 partition_expr_fetcher: PartitionExprFetcherRef,
134 ) -> RegionOpener {
135 RegionOpener {
136 region_id,
137 metadata_builder: None,
138 memtable_builder_provider,
139 object_store_manager,
140 table_dir: normalize_dir(table_dir),
141 path_type,
142 purge_scheduler,
143 options: None,
144 cache_manager: None,
145 skip_wal_replay: false,
146 puffin_manager_factory,
147 intermediate_manager,
148 time_provider,
149 stats: Default::default(),
150 wal_entry_reader: None,
151 replay_checkpoint: None,
152 file_ref_manager,
153 partition_expr_fetcher,
154 }
155 }
156
157 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
159 self.metadata_builder = Some(builder);
160 self
161 }
162
163 fn region_dir(&self) -> String {
165 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
166 }
167
168 fn build_metadata(&mut self) -> Result<RegionMetadata> {
174 let options = self.options.as_ref().unwrap();
175 let mut metadata_builder = self.metadata_builder.take().unwrap();
176 metadata_builder.primary_key_encoding(options.primary_key_encoding());
177 metadata_builder.build().context(InvalidMetadataSnafu)
178 }
179
180 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
182 self.options(RegionOptions::try_from(&options)?)
183 }
184
185 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
187 self.replay_checkpoint = replay_checkpoint;
188 self
189 }
190
191 pub(crate) fn wal_entry_reader(
194 mut self,
195 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
196 ) -> Self {
197 self.wal_entry_reader = wal_entry_reader;
198 self
199 }
200
201 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
203 options.validate()?;
204 self.options = Some(options);
205 Ok(self)
206 }
207
208 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
210 self.cache_manager = cache_manager;
211 self
212 }
213
214 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
216 self.skip_wal_replay = skip;
217 self
218 }
219
220 pub(crate) async fn create_or_open<S: LogStore>(
227 mut self,
228 config: &MitoConfig,
229 wal: &Wal<S>,
230 ) -> Result<MitoRegionRef> {
231 let region_id = self.region_id;
232 let region_dir = self.region_dir();
233 let metadata = self.build_metadata()?;
234 match self.maybe_open(config, wal).await {
236 Ok(Some(region)) => {
237 let recovered = region.metadata();
238 let expect = &metadata;
240 check_recovered_region(
241 &recovered,
242 expect.region_id,
243 &expect.column_metadatas,
244 &expect.primary_key,
245 )?;
246 region.set_role(RegionRole::Leader);
248
249 return Ok(region);
250 }
251 Ok(None) => {
252 debug!(
253 "No data under directory {}, region_id: {}",
254 region_dir, self.region_id
255 );
256 }
257 Err(e) => {
258 warn!(e;
259 "Failed to open region {} before creating it, region_dir: {}",
260 self.region_id, region_dir
261 );
262 }
263 }
264 let mut options = self.options.take().unwrap();
266 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
267 let provider = self.provider::<S>(&options.wal_options)?;
268 let metadata = Arc::new(metadata);
269 let sst_format = if let Some(format) = options.sst_format {
271 format
272 } else if config.default_experimental_flat_format {
273 options.sst_format = Some(FormatType::Flat);
274 FormatType::Flat
275 } else {
276 options.sst_format = Some(FormatType::PrimaryKey);
278 FormatType::PrimaryKey
279 };
280 let mut region_manifest_options =
282 RegionManifestOptions::new(config, ®ion_dir, &object_store);
283 region_manifest_options.manifest_cache = self
285 .cache_manager
286 .as_ref()
287 .and_then(|cm| cm.write_cache())
288 .and_then(|wc| wc.manifest_cache());
289 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
291 let manifest_manager = RegionManifestManager::new(
292 metadata.clone(),
293 flushed_entry_id,
294 region_manifest_options,
295 sst_format,
296 &self.stats,
297 )
298 .await?;
299
300 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
301 let part_duration = options.compaction.time_window();
302 let mutable = Arc::new(TimePartitions::new(
304 metadata.clone(),
305 memtable_builder.clone(),
306 0,
307 part_duration,
308 ));
309
310 debug!(
311 "Create region {} with options: {:?}, default_flat_format: {}",
312 region_id, options, config.default_experimental_flat_format
313 );
314
315 let version = VersionBuilder::new(metadata, mutable)
316 .options(options)
317 .build();
318 let version_control = Arc::new(VersionControl::new(version));
319 let access_layer = Arc::new(AccessLayer::new(
320 self.table_dir.clone(),
321 self.path_type,
322 object_store,
323 self.puffin_manager_factory,
324 self.intermediate_manager,
325 ));
326 let now = self.time_provider.current_time_millis();
327
328 Ok(Arc::new(MitoRegion {
329 region_id,
330 version_control,
331 access_layer: access_layer.clone(),
332 manifest_ctx: Arc::new(ManifestContext::new(
334 manifest_manager,
335 RegionRoleState::Leader(RegionLeaderState::Writable),
336 )),
337 file_purger: create_file_purger(
338 config.gc.enable,
339 self.path_type,
340 self.purge_scheduler,
341 access_layer,
342 self.cache_manager,
343 self.file_ref_manager.clone(),
344 ),
345 provider,
346 last_flush_millis: AtomicI64::new(now),
347 last_compaction_millis: AtomicI64::new(now),
348 time_provider: self.time_provider.clone(),
349 topic_latest_entry_id: AtomicU64::new(0),
350 written_bytes: Arc::new(AtomicU64::new(0)),
351 stats: self.stats,
352 staging_partition_info: Mutex::new(None),
353 }))
354 }
355
356 pub(crate) async fn open<S: LogStore>(
360 mut self,
361 config: &MitoConfig,
362 wal: &Wal<S>,
363 ) -> Result<MitoRegionRef> {
364 let region_id = self.region_id;
365 let region_dir = self.region_dir();
366 let region = self
367 .maybe_open(config, wal)
368 .await?
369 .with_context(|| EmptyRegionDirSnafu {
370 region_id,
371 region_dir: ®ion_dir,
372 })?;
373
374 ensure!(
375 region.region_id == self.region_id,
376 RegionCorruptedSnafu {
377 region_id: self.region_id,
378 reason: format!(
379 "recovered region has different region id {}",
380 region.region_id
381 ),
382 }
383 );
384
385 Ok(region)
386 }
387
388 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
389 match wal_options {
390 WalOptions::RaftEngine => {
391 ensure!(
392 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
393 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
394 error::IncompatibleWalProviderChangeSnafu {
395 global: "`kafka`",
396 region: "`raft_engine`",
397 }
398 );
399 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
400 }
401 WalOptions::Kafka(options) => {
402 ensure!(
403 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
404 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
405 error::IncompatibleWalProviderChangeSnafu {
406 global: "`raft_engine`",
407 region: "`kafka`",
408 }
409 );
410 Ok(Provider::kafka_provider(options.topic.clone()))
411 }
412 WalOptions::Noop => Ok(Provider::noop_provider()),
413 }
414 }
415
416 async fn maybe_open<S: LogStore>(
418 &mut self,
419 config: &MitoConfig,
420 wal: &Wal<S>,
421 ) -> Result<Option<MitoRegionRef>> {
422 let now = Instant::now();
423 let mut region_options = self.options.as_ref().unwrap().clone();
424 let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?;
425 let mut region_manifest_options =
426 RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
427 region_manifest_options.manifest_cache = self
429 .cache_manager
430 .as_ref()
431 .and_then(|cm| cm.write_cache())
432 .and_then(|wc| wc.manifest_cache());
433 let Some(manifest_manager) =
434 RegionManifestManager::open(region_manifest_options, &self.stats).await?
435 else {
436 return Ok(None);
437 };
438
439 let manifest = manifest_manager.manifest();
441 let metadata = if manifest.metadata.partition_expr.is_none()
442 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
443 {
444 let metadata = manifest.metadata.as_ref().clone();
445 let mut builder = RegionMetadataBuilder::from_existing(metadata);
446 builder.partition_expr_json(Some(expr_json));
447 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
448 } else {
449 manifest.metadata.clone()
450 };
451 sanitize_region_options(&manifest, &mut region_options);
453
454 let region_id = self.region_id;
455 let provider = self.provider::<S>(®ion_options.wal_options)?;
456 let wal_entry_reader = self
457 .wal_entry_reader
458 .take()
459 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
460 let on_region_opened = wal.on_region_opened();
461 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
462
463 debug!(
464 "Open region {} at {} with options: {:?}",
465 region_id, self.table_dir, self.options
466 );
467
468 let access_layer = Arc::new(AccessLayer::new(
469 self.table_dir.clone(),
470 self.path_type,
471 object_store,
472 self.puffin_manager_factory.clone(),
473 self.intermediate_manager.clone(),
474 ));
475 let file_purger = create_file_purger(
476 config.gc.enable,
477 self.path_type,
478 self.purge_scheduler.clone(),
479 access_layer.clone(),
480 self.cache_manager.clone(),
481 self.file_ref_manager.clone(),
482 );
483 let memtable_builder = self
485 .memtable_builder_provider
486 .builder_for_options(®ion_options);
487 let part_duration = region_options
490 .compaction
491 .time_window()
492 .or(manifest.compaction_time_window);
493 let mutable = Arc::new(TimePartitions::new(
495 metadata.clone(),
496 memtable_builder.clone(),
497 0,
498 part_duration,
499 ));
500
501 let version_builder = version_builder_from_manifest(
503 &manifest,
504 metadata,
505 file_purger.clone(),
506 mutable,
507 region_options,
508 );
509 let version = version_builder.build();
510 let flushed_entry_id = version.flushed_entry_id;
511 let version_control = Arc::new(VersionControl::new(version));
512
513 let topic_latest_entry_id = if !self.skip_wal_replay {
514 let replay_from_entry_id = self
515 .replay_checkpoint
516 .unwrap_or_default()
517 .max(flushed_entry_id);
518 info!(
519 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
520 replay_from_entry_id,
521 region_id,
522 manifest.manifest_version,
523 flushed_entry_id,
524 now.elapsed()
525 );
526 replay_memtable(
527 &provider,
528 wal_entry_reader,
529 region_id,
530 replay_from_entry_id,
531 &version_control,
532 config.allow_stale_entries,
533 on_region_opened,
534 )
535 .await?;
536 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
540 wal.store().latest_entry_id(&provider).unwrap_or(0)
541 } else {
542 0
543 }
544 } else {
545 info!(
546 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
547 region_id,
548 manifest.manifest_version,
549 flushed_entry_id,
550 now.elapsed()
551 );
552
553 0
554 };
555
556 if let Some(committed_in_manifest) = manifest.committed_sequence {
557 let committed_after_replay = version_control.committed_sequence();
558 if committed_in_manifest > committed_after_replay {
559 info!(
560 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
561 self.region_id,
562 version_control.current().version.flushed_sequence,
563 version_control.committed_sequence(),
564 committed_in_manifest
565 );
566 version_control.set_committed_sequence(committed_in_manifest);
567 }
568 }
569
570 let now = self.time_provider.current_time_millis();
571
572 let region = MitoRegion {
573 region_id: self.region_id,
574 version_control: version_control.clone(),
575 access_layer: access_layer.clone(),
576 manifest_ctx: Arc::new(ManifestContext::new(
578 manifest_manager,
579 RegionRoleState::Follower,
580 )),
581 file_purger,
582 provider: provider.clone(),
583 last_flush_millis: AtomicI64::new(now),
584 last_compaction_millis: AtomicI64::new(now),
585 time_provider: self.time_provider.clone(),
586 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
587 written_bytes: Arc::new(AtomicU64::new(0)),
588 stats: self.stats.clone(),
589 staging_partition_info: Mutex::new(None),
591 };
592
593 let region = Arc::new(region);
594
595 maybe_load_cache(®ion, config, &self.cache_manager);
596 maybe_preload_parquet_meta_cache(®ion, config, &self.cache_manager);
597
598 Ok(Some(region))
599 }
600}
601
602pub(crate) fn version_builder_from_manifest(
604 manifest: &RegionManifest,
605 metadata: RegionMetadataRef,
606 file_purger: FilePurgerRef,
607 mutable: TimePartitionsRef,
608 region_options: RegionOptions,
609) -> VersionBuilder {
610 VersionBuilder::new(metadata, mutable)
611 .add_files(file_purger, manifest.files.values().cloned())
612 .flushed_entry_id(manifest.flushed_entry_id)
613 .flushed_sequence(manifest.flushed_sequence)
614 .truncated_entry_id(manifest.truncated_entry_id)
615 .compaction_time_window(manifest.compaction_time_window)
616 .options(region_options)
617}
618
619pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
621 let option_format = options.sst_format.unwrap_or_default();
622 if option_format != manifest.sst_format {
623 common_telemetry::warn!(
624 "Overriding SST format from {:?} to {:?} for region {}",
625 option_format,
626 manifest.sst_format,
627 manifest.metadata.region_id,
628 );
629 options.sst_format = Some(manifest.sst_format);
630 }
631 if let Some(manifest_append_mode) = manifest.append_mode
632 && options.append_mode != manifest_append_mode
633 {
634 common_telemetry::warn!(
635 "Overriding append_mode from {} to {} for region {}",
636 options.append_mode,
637 manifest_append_mode,
638 manifest.metadata.region_id,
639 );
640 options.append_mode = manifest_append_mode;
641 }
642 if options.append_mode && options.merge_mode.take().is_some() {
643 common_telemetry::warn!(
644 "Ignoring merge_mode because append_mode is enabled for region {}",
645 manifest.metadata.region_id,
646 );
647 }
648}
649
650pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
652 let append_mode_enabled = options
653 .get("append_mode")
654 .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
655
656 if append_mode_enabled && options.remove("merge_mode").is_some() {
657 common_telemetry::warn!(
658 "Ignoring merge_mode in open request options because append_mode is enabled"
659 );
660 }
661}
662
663pub fn get_object_store(
665 name: &Option<String>,
666 object_store_manager: &ObjectStoreManagerRef,
667) -> Result<object_store::ObjectStore> {
668 if let Some(name) = name {
669 Ok(object_store_manager
670 .find(name)
671 .with_context(|| ObjectStoreNotFoundSnafu {
672 object_store: name.clone(),
673 })?
674 .clone())
675 } else {
676 Ok(object_store_manager.default_object_store().clone())
677 }
678}
679
680pub(crate) fn check_recovered_region(
682 recovered: &RegionMetadata,
683 region_id: RegionId,
684 column_metadatas: &[ColumnMetadata],
685 primary_key: &[ColumnId],
686) -> Result<()> {
687 if recovered.region_id != region_id {
688 error!(
689 "Recovered region {}, expect region {}",
690 recovered.region_id, region_id
691 );
692 return RegionCorruptedSnafu {
693 region_id,
694 reason: format!(
695 "recovered metadata has different region id {}",
696 recovered.region_id
697 ),
698 }
699 .fail();
700 }
701 if recovered.column_metadatas != column_metadatas {
702 error!(
703 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
704 recovered.region_id, recovered.column_metadatas, column_metadatas
705 );
706
707 return RegionCorruptedSnafu {
708 region_id,
709 reason: "recovered metadata has different schema",
710 }
711 .fail();
712 }
713 if recovered.primary_key != primary_key {
714 error!(
715 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
716 recovered.region_id, recovered.primary_key, primary_key
717 );
718
719 return RegionCorruptedSnafu {
720 region_id,
721 reason: "recovered metadata has different primary key",
722 }
723 .fail();
724 }
725
726 Ok(())
727}
728
729pub(crate) async fn replay_memtable<F>(
731 provider: &Provider,
732 mut wal_entry_reader: Box<dyn WalEntryReader>,
733 region_id: RegionId,
734 flushed_entry_id: EntryId,
735 version_control: &VersionControlRef,
736 allow_stale_entries: bool,
737 on_region_opened: F,
738) -> Result<EntryId>
739where
740 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
741{
742 let now = Instant::now();
743 let mut rows_replayed = 0;
744 let mut last_entry_id = flushed_entry_id;
747 let replay_from_entry_id = flushed_entry_id + 1;
748
749 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
750 while let Some(res) = wal_stream.next().await {
751 let (entry_id, entry) = res?;
752 if entry_id <= flushed_entry_id {
753 warn!(
754 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
755 region_id, flushed_entry_id, entry_id
756 );
757 ensure!(
758 allow_stale_entries,
759 StaleLogEntrySnafu {
760 region_id,
761 flushed_entry_id,
762 unexpected_entry_id: entry_id,
763 }
764 );
765 }
766 last_entry_id = last_entry_id.max(entry_id);
767
768 let mut region_write_ctx = RegionWriteCtx::new(
769 region_id,
770 version_control,
771 provider.clone(),
772 None,
774 );
775 for mutation in entry.mutations {
776 rows_replayed += mutation
777 .rows
778 .as_ref()
779 .map(|rows| rows.rows.len())
780 .unwrap_or(0);
781 region_write_ctx.push_mutation(
782 mutation.op_type,
783 mutation.rows,
784 mutation.write_hint,
785 OptionOutputTx::none(),
786 Some(mutation.sequence),
788 );
789 }
790
791 for bulk_entry in entry.bulk_entries {
792 let part = BulkPart::try_from(bulk_entry)?;
793 rows_replayed += part.num_rows();
794 let bulk_sequence_from_wal = part.sequence;
796 ensure!(
797 region_write_ctx.push_bulk(
798 OptionOutputTx::none(),
799 part,
800 Some(bulk_sequence_from_wal)
801 ),
802 RegionCorruptedSnafu {
803 region_id,
804 reason: "unable to replay memtable with bulk entries",
805 }
806 );
807 }
808
809 region_write_ctx.set_next_entry_id(last_entry_id + 1);
811 region_write_ctx.write_memtable().await;
812 region_write_ctx.write_bulk().await;
813 }
814
815 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
818
819 let series_count = version_control.current().series_count();
820 info!(
821 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
822 region_id,
823 provider,
824 rows_replayed,
825 replay_from_entry_id,
826 last_entry_id,
827 series_count,
828 now.elapsed()
829 );
830 Ok(last_entry_id)
831}
832
833pub(crate) struct RegionLoadCacheTask {
835 region: MitoRegionRef,
836}
837
838impl RegionLoadCacheTask {
839 pub(crate) fn new(region: MitoRegionRef) -> Self {
840 Self { region }
841 }
842
843 pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
845 let region_id = self.region.region_id;
846 let table_dir = self.region.access_layer.table_dir();
847 let path_type = self.region.access_layer.path_type();
848 let object_store = self.region.access_layer.object_store();
849 let version_control = &self.region.version_control;
850
851 let mut files_to_download = Vec::new();
853 let mut files_already_cached = 0;
854
855 {
856 let version = version_control.current().version;
857 for level in version.ssts.levels() {
858 for file_handle in level.files.values() {
859 let file_meta = file_handle.meta_ref();
860 if file_meta.exists_index() {
861 let puffin_key = IndexKey::new(
862 file_meta.region_id,
863 file_meta.file_id,
864 FileType::Puffin(file_meta.index_version),
865 );
866
867 if !file_cache.contains_key(&puffin_key) {
868 files_to_download.push((
869 puffin_key,
870 file_meta.index_file_size,
871 file_meta.time_range.1, ));
873 } else {
874 files_already_cached += 1;
875 }
876 }
877 }
878 }
879 }
882
883 files_to_download.sort_by(|a, b| b.2.cmp(&a.2));
885
886 let total_files = files_to_download.len() as i64;
887
888 info!(
889 "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
890 region_id, total_files, files_already_cached
891 );
892
893 CACHE_FILL_PENDING_FILES.add(total_files);
894
895 let mut files_downloaded = 0;
896 let mut files_skipped = 0;
897
898 for (puffin_key, file_size, max_timestamp) in files_to_download {
899 let current_size = file_cache.puffin_cache_size();
900 let capacity = file_cache.puffin_cache_capacity();
901 let region_state = self.region.state();
902 if !can_load_cache(region_state) {
903 info!(
904 "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
905 region_state, region_id, current_size, capacity
906 );
907 break;
908 }
909
910 if current_size + file_size > capacity {
912 info!(
913 "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
914 region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
915 );
916 files_skipped = (total_files - files_downloaded) as usize;
917 CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
918 break;
919 }
920
921 let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
922 version
923 } else {
924 unreachable!("`files_to_download` should only contains Puffin files");
925 };
926 let index_id = RegionIndexId::new(
927 RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
928 index_version,
929 );
930
931 let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
932
933 match file_cache
934 .download(puffin_key, &index_remote_path, object_store, file_size)
935 .await
936 {
937 Ok(_) => {
938 debug!(
939 "Downloaded index file to write cache, region: {}, file_id: {}",
940 region_id, puffin_key.file_id
941 );
942 files_downloaded += 1;
943 CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
944 CACHE_FILL_PENDING_FILES.dec();
945 }
946 Err(e) => {
947 warn!(
948 e; "Failed to download index file to write cache, region: {}, file_id: {}",
949 region_id, puffin_key.file_id
950 );
951 CACHE_FILL_PENDING_FILES.dec();
952 }
953 }
954 }
955
956 info!(
957 "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
958 region_id, total_files, files_downloaded, files_already_cached, files_skipped
959 );
960 }
961}
962
963fn maybe_load_cache(
965 region: &MitoRegionRef,
966 config: &MitoConfig,
967 cache_manager: &Option<CacheManagerRef>,
968) {
969 let Some(cache_manager) = cache_manager else {
970 return;
971 };
972 let Some(write_cache) = cache_manager.write_cache() else {
973 return;
974 };
975
976 let preload_enabled = config.preload_index_cache;
977 if !preload_enabled {
978 return;
979 }
980
981 let task = RegionLoadCacheTask::new(region.clone());
982 write_cache.load_region_cache(task);
983}
984
985async fn preload_parquet_meta_cache_for_files(
996 region_id: RegionId,
997 cache_manager: CacheManagerRef,
998 sst_meta_cache_capacity: u64,
999 table_dir: String,
1000 path_type: PathType,
1001 object_store: object_store::ObjectStore,
1002 mut files: Vec<FileHandle>,
1003) -> usize {
1004 if !cache_manager.sst_meta_cache_enabled()
1005 || sst_meta_cache_capacity == 0
1006 || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1007 {
1008 return 0;
1009 }
1010
1011 let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
1012
1013 files.sort_by(|a, b| b.meta_ref().time_range.1.cmp(&a.meta_ref().time_range.1));
1015
1016 let mut loaded = 0usize;
1017 for file_handle in files {
1018 if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1020 break;
1021 }
1022
1023 let file_id = file_handle.file_id();
1024 let mut cache_metrics = MetadataCacheMetrics::default();
1025 if cache_manager
1026 .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
1027 .await
1028 .is_some()
1029 {
1030 if cache_metrics.mem_cache_hit == 0 {
1032 loaded += 1;
1033 }
1034 continue;
1035 }
1036
1037 if !allow_direct_load {
1038 continue;
1039 }
1040
1041 let file_size = file_handle.meta_ref().file_size;
1042 let file_path = file_handle.file_path(&table_dir, path_type);
1043 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1044 match loader.load(&mut cache_metrics).await {
1045 Ok(metadata) => {
1046 cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1047 loaded += 1;
1048 }
1049 Err(err) => {
1050 warn!(
1052 err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1053 region_id, file_path
1054 );
1055 }
1056 }
1057 }
1058
1059 loaded
1060}
1061
1062fn maybe_preload_parquet_meta_cache(
1063 region: &MitoRegionRef,
1064 config: &MitoConfig,
1065 cache_manager: &Option<CacheManagerRef>,
1066) {
1067 let Some(cache_manager) = cache_manager else {
1068 return;
1069 };
1070 if !cache_manager.sst_meta_cache_enabled() {
1071 return;
1072 }
1073
1074 if config.sst_meta_cache_size.as_bytes() == 0 {
1076 return;
1077 }
1078 if !config.preload_index_cache {
1079 return;
1080 }
1081
1082 let region = region.clone();
1083 let cache_manager = cache_manager.clone();
1084 let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1085
1086 tokio::spawn(async move {
1087 let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1089
1090 let region_id = region.region_id;
1091 let table_dir = region.access_layer.table_dir().to_string();
1092 let path_type = region.access_layer.path_type();
1093 let object_store = region.access_layer.object_store().clone();
1094
1095 let mut files = Vec::new();
1097 {
1098 let version = region.version_control.current().version;
1099 for level in version.ssts.levels() {
1100 for file_handle in level.files.values() {
1101 files.push(file_handle.clone());
1102 }
1103 }
1104 }
1105 let preloading_start = Instant::now();
1106 let loaded = preload_parquet_meta_cache_for_files(
1107 region_id,
1108 cache_manager,
1109 sst_meta_cache_capacity,
1110 table_dir,
1111 path_type,
1112 object_store,
1113 files,
1114 )
1115 .await;
1116 let preloading_cost = preloading_start.elapsed();
1117
1118 if loaded > 0 {
1119 info!(
1120 "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1121 region_id,
1122 loaded,
1123 preloading_cost.as_millis()
1124 );
1125 }
1126 });
1127}
1128
1129fn can_load_cache(state: RegionRoleState) -> bool {
1130 match state {
1131 RegionRoleState::Leader(RegionLeaderState::Writable)
1132 | RegionRoleState::Leader(RegionLeaderState::Staging)
1133 | RegionRoleState::Leader(RegionLeaderState::Altering)
1134 | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1135 | RegionRoleState::Leader(RegionLeaderState::Editing)
1136 | RegionRoleState::Follower => true,
1137 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1139 | RegionRoleState::Leader(RegionLeaderState::Dropping)
1140 | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1141 }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use std::sync::Arc;
1147
1148 use common_base::readable_size::ReadableSize;
1149 use common_test_util::temp_dir::create_temp_dir;
1150 use common_time::Timestamp;
1151 use datatypes::arrow::array::{ArrayRef, Int64Array};
1152 use datatypes::arrow::record_batch::RecordBatch;
1153 use object_store::ObjectStore;
1154 use object_store::services::{Fs, Memory};
1155 use parquet::arrow::ArrowWriter;
1156 use parquet::file::metadata::KeyValue;
1157 use parquet::file::properties::WriterProperties;
1158 use store_api::region_request::PathType;
1159 use store_api::storage::{FileId, RegionId};
1160
1161 use super::preload_parquet_meta_cache_for_files;
1162 use crate::cache::CacheManager;
1163 use crate::cache::file_cache::{FileType, IndexKey};
1164 use crate::sst::file::{FileHandle, FileMeta};
1165 use crate::sst::file_purger::NoopFilePurger;
1166 use crate::sst::parquet::PARQUET_METADATA_KEY;
1167 use crate::test_util::TestEnv;
1168 use crate::test_util::sst_util::sst_region_metadata;
1169
1170 fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1171 let key_value_meta = KeyValue::new(
1172 PARQUET_METADATA_KEY.to_string(),
1173 sst_region_metadata().to_json().unwrap(),
1174 );
1175 let props = WriterProperties::builder()
1176 .set_key_value_metadata(Some(vec![key_value_meta]))
1177 .build();
1178
1179 let mut parquet_bytes = Vec::new();
1180 let mut writer =
1181 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1182 writer.write(batch).unwrap();
1183 writer.close().unwrap();
1184
1185 parquet_bytes
1186 }
1187
1188 #[tokio::test]
1189 async fn test_preload_parquet_meta_cache_uses_file_cache() {
1190 let env = TestEnv::new().await;
1191
1192 let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1193 let write_cache = env
1194 .create_write_cache(local_store, ReadableSize::mb(1024))
1195 .await;
1196 let cache_manager = Arc::new(
1197 CacheManager::builder()
1198 .sst_meta_cache_size(1024 * 1024)
1199 .write_cache(Some(write_cache.clone()))
1200 .build(),
1201 );
1202
1203 let region_id = RegionId::new(1, 1);
1204 let file_id = FileId::random();
1205
1206 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1207 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1208 let parquet_bytes = sst_parquet_bytes(&batch);
1209 let file_size = parquet_bytes.len() as u64;
1210
1211 let file_meta = FileMeta {
1212 region_id,
1213 file_id,
1214 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1215 level: 0,
1216 file_size,
1217 max_row_group_uncompressed_size: 0,
1218 available_indexes: Default::default(),
1219 indexes: vec![],
1220 index_file_size: 0,
1221 index_version: 0,
1222 num_rows: 3,
1223 num_row_groups: 1,
1224 sequence: None,
1225 partition_expr: None,
1226 num_series: 0,
1227 };
1228 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1229
1230 let table_dir = "test_table";
1231 let path_type = PathType::Bare;
1232 let remote_path = file_handle.file_path(table_dir, path_type);
1233
1234 let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1235 source_store
1236 .write(&remote_path, parquet_bytes)
1237 .await
1238 .unwrap();
1239
1240 let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1242 write_cache
1243 .file_cache()
1244 .download(index_key, &remote_path, &source_store, file_size)
1245 .await
1246 .unwrap();
1247
1248 let region_file_id = file_handle.file_id();
1249 assert!(
1250 cache_manager
1251 .get_parquet_meta_data_from_mem_cache(region_file_id)
1252 .is_none()
1253 );
1254
1255 let loaded = preload_parquet_meta_cache_for_files(
1256 region_id,
1257 cache_manager.clone(),
1258 1024 * 1024,
1259 table_dir.to_string(),
1260 path_type,
1261 source_store.clone(),
1262 vec![file_handle],
1263 )
1264 .await;
1265
1266 assert_eq!(loaded, 1);
1268 assert!(
1269 cache_manager
1270 .get_parquet_meta_data_from_mem_cache(region_file_id)
1271 .is_some()
1272 );
1273 }
1274
1275 #[tokio::test]
1276 async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1277 let cache_manager = Arc::new(
1278 CacheManager::builder()
1279 .sst_meta_cache_size(1024 * 1024)
1280 .build(),
1281 );
1282
1283 let region_id = RegionId::new(1, 1);
1284 let file_id = FileId::random();
1285
1286 let file_meta = FileMeta {
1288 region_id,
1289 file_id,
1290 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1291 level: 0,
1292 file_size: 0,
1293 max_row_group_uncompressed_size: 0,
1294 available_indexes: Default::default(),
1295 indexes: vec![],
1296 index_file_size: 0,
1297 index_version: 0,
1298 num_rows: 3,
1299 num_row_groups: 1,
1300 sequence: None,
1301 partition_expr: None,
1302 num_series: 0,
1303 };
1304 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1305
1306 let table_dir = "test_table";
1307 let path_type = PathType::Bare;
1308 let remote_path = file_handle.file_path(table_dir, path_type);
1309
1310 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1312 object_store
1313 .write(&remote_path, b"noop".as_slice())
1314 .await
1315 .unwrap();
1316
1317 let region_file_id = file_handle.file_id();
1318 assert!(
1319 cache_manager
1320 .get_parquet_meta_data_from_mem_cache(region_file_id)
1321 .is_none()
1322 );
1323
1324 let loaded = preload_parquet_meta_cache_for_files(
1325 region_id,
1326 cache_manager.clone(),
1327 1024 * 1024,
1328 table_dir.to_string(),
1329 path_type,
1330 object_store,
1331 vec![file_handle],
1332 )
1333 .await;
1334
1335 assert_eq!(loaded, 0);
1336 assert!(
1337 cache_manager
1338 .get_parquet_meta_data_from_mem_cache(region_file_id)
1339 .is_none()
1340 );
1341 }
1342
1343 #[tokio::test]
1344 async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1345 let cache_manager = Arc::new(
1346 CacheManager::builder()
1347 .sst_meta_cache_size(1024 * 1024)
1348 .build(),
1349 );
1350
1351 let region_id = RegionId::new(1, 1);
1352 let file_id = FileId::random();
1353
1354 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1355 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1356 let parquet_bytes = sst_parquet_bytes(&batch);
1357
1358 let file_meta = FileMeta {
1361 region_id,
1362 file_id,
1363 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1364 level: 0,
1365 file_size: 0,
1366 max_row_group_uncompressed_size: 0,
1367 available_indexes: Default::default(),
1368 indexes: vec![],
1369 index_file_size: 0,
1370 index_version: 0,
1371 num_rows: 3,
1372 num_row_groups: 1,
1373 sequence: None,
1374 partition_expr: None,
1375 num_series: 0,
1376 };
1377 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1378
1379 let table_dir = "test_table";
1380 let path_type = PathType::Bare;
1381 let file_path = file_handle.file_path(table_dir, path_type);
1382
1383 let root = create_temp_dir("parquet-meta-preload");
1384 let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1385 .unwrap()
1386 .finish();
1387 object_store.write(&file_path, parquet_bytes).await.unwrap();
1388
1389 let region_file_id = file_handle.file_id();
1390 assert!(
1391 cache_manager
1392 .get_parquet_meta_data_from_mem_cache(region_file_id)
1393 .is_none()
1394 );
1395
1396 let loaded = preload_parquet_meta_cache_for_files(
1397 region_id,
1398 cache_manager.clone(),
1399 1024 * 1024,
1400 table_dir.to_string(),
1401 path_type,
1402 object_store,
1403 vec![file_handle],
1404 )
1405 .await;
1406
1407 assert_eq!(loaded, 1);
1408 assert!(
1409 cache_manager
1410 .get_parquet_meta_data_from_mem_cache(region_file_id)
1411 .is_some()
1412 );
1413 }
1414}