1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41use tokio::sync::Semaphore;
42
43use crate::access_layer::AccessLayer;
44use crate::cache::CacheManagerRef;
45use crate::cache::file_cache::{FileCache, FileType, IndexKey};
46use crate::config::MitoConfig;
47use crate::error;
48use crate::error::{
49 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
50 Result, StaleLogEntrySnafu,
51};
52use crate::manifest::action::RegionManifest;
53use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61 ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
74use crate::sst::parquet::reader::MetadataCacheMetrics;
75use crate::time_provider::TimeProviderRef;
76use crate::wal::entry_reader::WalEntryReader;
77use crate::wal::{EntryId, Wal};
78
79const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
80
81static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
82 LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
83
84#[async_trait::async_trait]
90pub trait PartitionExprFetcher {
91 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
92}
93
94pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
95
96pub(crate) struct RegionOpener {
98 region_id: RegionId,
99 metadata_builder: Option<RegionMetadataBuilder>,
100 memtable_builder_provider: MemtableBuilderProvider,
101 object_store_manager: ObjectStoreManagerRef,
102 table_dir: String,
103 path_type: PathType,
104 purge_scheduler: SchedulerRef,
105 options: Option<RegionOptions>,
106 cache_manager: Option<CacheManagerRef>,
107 skip_wal_replay: bool,
108 puffin_manager_factory: PuffinManagerFactory,
109 intermediate_manager: IntermediateManager,
110 time_provider: TimeProviderRef,
111 stats: ManifestStats,
112 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
113 replay_checkpoint: Option<u64>,
114 file_ref_manager: FileReferenceManagerRef,
115 partition_expr_fetcher: PartitionExprFetcherRef,
116}
117
118impl RegionOpener {
119 #[allow(clippy::too_many_arguments)]
122 pub(crate) fn new(
123 region_id: RegionId,
124 table_dir: &str,
125 path_type: PathType,
126 memtable_builder_provider: MemtableBuilderProvider,
127 object_store_manager: ObjectStoreManagerRef,
128 purge_scheduler: SchedulerRef,
129 puffin_manager_factory: PuffinManagerFactory,
130 intermediate_manager: IntermediateManager,
131 time_provider: TimeProviderRef,
132 file_ref_manager: FileReferenceManagerRef,
133 partition_expr_fetcher: PartitionExprFetcherRef,
134 ) -> RegionOpener {
135 RegionOpener {
136 region_id,
137 metadata_builder: None,
138 memtable_builder_provider,
139 object_store_manager,
140 table_dir: normalize_dir(table_dir),
141 path_type,
142 purge_scheduler,
143 options: None,
144 cache_manager: None,
145 skip_wal_replay: false,
146 puffin_manager_factory,
147 intermediate_manager,
148 time_provider,
149 stats: Default::default(),
150 wal_entry_reader: None,
151 replay_checkpoint: None,
152 file_ref_manager,
153 partition_expr_fetcher,
154 }
155 }
156
157 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
159 self.metadata_builder = Some(builder);
160 self
161 }
162
163 fn region_dir(&self) -> String {
165 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
166 }
167
168 fn build_metadata(&mut self) -> Result<RegionMetadata> {
174 let options = self.options.as_ref().unwrap();
175 let mut metadata_builder = self.metadata_builder.take().unwrap();
176 metadata_builder.primary_key_encoding(options.primary_key_encoding());
177 metadata_builder.build().context(InvalidMetadataSnafu)
178 }
179
180 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
182 self.options(RegionOptions::try_from(&options)?)
183 }
184
185 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
187 self.replay_checkpoint = replay_checkpoint;
188 self
189 }
190
191 pub(crate) fn wal_entry_reader(
194 mut self,
195 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
196 ) -> Self {
197 self.wal_entry_reader = wal_entry_reader;
198 self
199 }
200
201 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
203 options.validate()?;
204 self.options = Some(options);
205 Ok(self)
206 }
207
208 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
210 self.cache_manager = cache_manager;
211 self
212 }
213
214 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
216 self.skip_wal_replay = skip;
217 self
218 }
219
220 pub(crate) async fn create_or_open<S: LogStore>(
227 mut self,
228 config: &MitoConfig,
229 wal: &Wal<S>,
230 ) -> Result<MitoRegionRef> {
231 let region_id = self.region_id;
232 let region_dir = self.region_dir();
233 let metadata = self.build_metadata()?;
234 match self.maybe_open(config, wal).await {
236 Ok(Some(region)) => {
237 let recovered = region.metadata();
238 let expect = &metadata;
240 check_recovered_region(
241 &recovered,
242 expect.region_id,
243 &expect.column_metadatas,
244 &expect.primary_key,
245 )?;
246 region.set_role(RegionRole::Leader);
248
249 return Ok(region);
250 }
251 Ok(None) => {
252 debug!(
253 "No data under directory {}, region_id: {}",
254 region_dir, self.region_id
255 );
256 }
257 Err(e) => {
258 warn!(e;
259 "Failed to open region {} before creating it, region_dir: {}",
260 self.region_id, region_dir
261 );
262 }
263 }
264 let mut options = self.options.take().unwrap();
266 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
267 let provider = self.provider::<S>(&options.wal_options)?;
268 let metadata = Arc::new(metadata);
269 let sst_format = if let Some(format) = options.sst_format {
271 format
272 } else if config.default_flat_format {
273 options.sst_format = Some(FormatType::Flat);
274 FormatType::Flat
275 } else {
276 options.sst_format = Some(FormatType::PrimaryKey);
278 FormatType::PrimaryKey
279 };
280 let mut region_manifest_options =
282 RegionManifestOptions::new(config, ®ion_dir, &object_store);
283 region_manifest_options.manifest_cache = self
285 .cache_manager
286 .as_ref()
287 .and_then(|cm| cm.write_cache())
288 .and_then(|wc| wc.manifest_cache());
289 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
291 let manifest_manager = RegionManifestManager::new(
292 metadata.clone(),
293 flushed_entry_id,
294 region_manifest_options,
295 sst_format,
296 &self.stats,
297 )
298 .await?;
299
300 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
301 let part_duration = options.compaction.time_window();
302 let mutable = Arc::new(TimePartitions::new(
304 metadata.clone(),
305 memtable_builder.clone(),
306 0,
307 part_duration,
308 ));
309
310 debug!(
311 "Create region {} with options: {:?}, default_flat_format: {}",
312 region_id, options, config.default_flat_format
313 );
314
315 let version = VersionBuilder::new(metadata, mutable)
316 .options(options)
317 .build();
318 let version_control = Arc::new(VersionControl::new(version));
319 let access_layer = Arc::new(AccessLayer::new(
320 self.table_dir.clone(),
321 self.path_type,
322 object_store,
323 self.puffin_manager_factory,
324 self.intermediate_manager,
325 ));
326 let now = self.time_provider.current_time_millis();
327
328 Ok(Arc::new(MitoRegion {
329 region_id,
330 version_control,
331 access_layer: access_layer.clone(),
332 manifest_ctx: Arc::new(ManifestContext::new(
334 manifest_manager,
335 RegionRoleState::Leader(RegionLeaderState::Writable),
336 )),
337 file_purger: create_file_purger(
338 config.gc.enable,
339 self.path_type,
340 self.purge_scheduler,
341 access_layer,
342 self.cache_manager,
343 self.file_ref_manager.clone(),
344 ),
345 provider,
346 last_flush_millis: AtomicI64::new(now),
347 last_compaction_millis: AtomicI64::new(now),
348 time_provider: self.time_provider.clone(),
349 topic_latest_entry_id: AtomicU64::new(0),
350 written_bytes: Arc::new(AtomicU64::new(0)),
351 stats: self.stats,
352 }))
353 }
354
355 pub(crate) async fn open<S: LogStore>(
359 mut self,
360 config: &MitoConfig,
361 wal: &Wal<S>,
362 ) -> Result<MitoRegionRef> {
363 let region_id = self.region_id;
364 let region_dir = self.region_dir();
365 let region = self
366 .maybe_open(config, wal)
367 .await?
368 .with_context(|| EmptyRegionDirSnafu {
369 region_id,
370 region_dir: ®ion_dir,
371 })?;
372
373 ensure!(
374 region.region_id == self.region_id,
375 RegionCorruptedSnafu {
376 region_id: self.region_id,
377 reason: format!(
378 "recovered region has different region id {}",
379 region.region_id
380 ),
381 }
382 );
383
384 Ok(region)
385 }
386
387 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
388 match wal_options {
389 WalOptions::RaftEngine => {
390 ensure!(
391 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
392 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
393 error::IncompatibleWalProviderChangeSnafu {
394 global: "`kafka`",
395 region: "`raft_engine`",
396 }
397 );
398 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
399 }
400 WalOptions::Kafka(options) => {
401 ensure!(
402 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
403 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
404 error::IncompatibleWalProviderChangeSnafu {
405 global: "`raft_engine`",
406 region: "`kafka`",
407 }
408 );
409 Ok(Provider::kafka_provider(options.topic.clone()))
410 }
411 WalOptions::Noop => Ok(Provider::noop_provider()),
412 }
413 }
414
415 async fn maybe_open<S: LogStore>(
417 &mut self,
418 config: &MitoConfig,
419 wal: &Wal<S>,
420 ) -> Result<Option<MitoRegionRef>> {
421 let now = Instant::now();
422 let mut region_options = self.options.as_ref().unwrap().clone();
423 let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?;
424 let mut region_manifest_options =
425 RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
426 region_manifest_options.manifest_cache = self
428 .cache_manager
429 .as_ref()
430 .and_then(|cm| cm.write_cache())
431 .and_then(|wc| wc.manifest_cache());
432 let Some(manifest_manager) =
433 RegionManifestManager::open(region_manifest_options, &self.stats).await?
434 else {
435 return Ok(None);
436 };
437
438 let manifest = manifest_manager.manifest();
440 let metadata = if manifest.metadata.partition_expr.is_none()
441 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
442 {
443 let metadata = manifest.metadata.as_ref().clone();
444 let mut builder = RegionMetadataBuilder::from_existing(metadata);
445 builder.partition_expr_json(Some(expr_json));
446 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
447 } else {
448 manifest.metadata.clone()
449 };
450 sanitize_region_options(&manifest, &mut region_options);
452
453 let region_id = self.region_id;
454 let provider = self.provider::<S>(®ion_options.wal_options)?;
455 let wal_entry_reader = self
456 .wal_entry_reader
457 .take()
458 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
459 let on_region_opened = wal.on_region_opened();
460 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
461
462 debug!(
463 "Open region {} at {} with options: {:?}",
464 region_id, self.table_dir, self.options
465 );
466
467 let access_layer = Arc::new(AccessLayer::new(
468 self.table_dir.clone(),
469 self.path_type,
470 object_store,
471 self.puffin_manager_factory.clone(),
472 self.intermediate_manager.clone(),
473 ));
474 let file_purger = create_file_purger(
475 config.gc.enable,
476 self.path_type,
477 self.purge_scheduler.clone(),
478 access_layer.clone(),
479 self.cache_manager.clone(),
480 self.file_ref_manager.clone(),
481 );
482 let memtable_builder = self
484 .memtable_builder_provider
485 .builder_for_options(®ion_options);
486 let part_duration = region_options
489 .compaction
490 .time_window()
491 .or(manifest.compaction_time_window);
492 let mutable = Arc::new(TimePartitions::new(
494 metadata.clone(),
495 memtable_builder.clone(),
496 0,
497 part_duration,
498 ));
499
500 let version_builder = version_builder_from_manifest(
502 &manifest,
503 metadata,
504 file_purger.clone(),
505 mutable,
506 region_options,
507 );
508 let version = version_builder.build();
509 let flushed_entry_id = version.flushed_entry_id;
510 let version_control = Arc::new(VersionControl::new(version));
511
512 let topic_latest_entry_id = if !self.skip_wal_replay {
513 let replay_from_entry_id = self
514 .replay_checkpoint
515 .unwrap_or_default()
516 .max(flushed_entry_id);
517 info!(
518 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
519 replay_from_entry_id,
520 region_id,
521 manifest.manifest_version,
522 flushed_entry_id,
523 now.elapsed()
524 );
525 replay_memtable(
526 &provider,
527 wal_entry_reader,
528 region_id,
529 replay_from_entry_id,
530 &version_control,
531 config.allow_stale_entries,
532 on_region_opened,
533 )
534 .await?;
535 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
539 wal.store().latest_entry_id(&provider).unwrap_or(0)
540 } else {
541 0
542 }
543 } else {
544 info!(
545 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
546 region_id,
547 manifest.manifest_version,
548 flushed_entry_id,
549 now.elapsed()
550 );
551
552 0
553 };
554
555 if let Some(committed_in_manifest) = manifest.committed_sequence {
556 let committed_after_replay = version_control.committed_sequence();
557 if committed_in_manifest > committed_after_replay {
558 info!(
559 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
560 self.region_id,
561 version_control.current().version.flushed_sequence,
562 version_control.committed_sequence(),
563 committed_in_manifest
564 );
565 version_control.set_committed_sequence(committed_in_manifest);
566 }
567 }
568
569 let now = self.time_provider.current_time_millis();
570
571 let region = MitoRegion {
572 region_id: self.region_id,
573 version_control: version_control.clone(),
574 access_layer: access_layer.clone(),
575 manifest_ctx: Arc::new(ManifestContext::new(
577 manifest_manager,
578 RegionRoleState::Follower,
579 )),
580 file_purger,
581 provider: provider.clone(),
582 last_flush_millis: AtomicI64::new(now),
583 last_compaction_millis: AtomicI64::new(now),
584 time_provider: self.time_provider.clone(),
585 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
586 written_bytes: Arc::new(AtomicU64::new(0)),
587 stats: self.stats.clone(),
588 };
589
590 let region = Arc::new(region);
591
592 maybe_load_cache(®ion, config, &self.cache_manager);
593 maybe_preload_parquet_meta_cache(®ion, config, &self.cache_manager);
594
595 Ok(Some(region))
596 }
597}
598
599pub(crate) fn version_builder_from_manifest(
601 manifest: &RegionManifest,
602 metadata: RegionMetadataRef,
603 file_purger: FilePurgerRef,
604 mutable: TimePartitionsRef,
605 region_options: RegionOptions,
606) -> VersionBuilder {
607 VersionBuilder::new(metadata, mutable)
608 .add_files(file_purger, manifest.files.values().cloned())
609 .flushed_entry_id(manifest.flushed_entry_id)
610 .flushed_sequence(manifest.flushed_sequence)
611 .truncated_entry_id(manifest.truncated_entry_id)
612 .compaction_time_window(manifest.compaction_time_window)
613 .options(region_options)
614}
615
616pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
618 let option_format = options.sst_format.unwrap_or_default();
619 if option_format != manifest.sst_format {
620 common_telemetry::warn!(
621 "Overriding SST format from {:?} to {:?} for region {}",
622 option_format,
623 manifest.sst_format,
624 manifest.metadata.region_id,
625 );
626 }
627 options.sst_format = Some(manifest.sst_format);
630 if let Some(manifest_append_mode) = manifest.append_mode
631 && options.append_mode != manifest_append_mode
632 {
633 common_telemetry::warn!(
634 "Overriding append_mode from {} to {} for region {}",
635 options.append_mode,
636 manifest_append_mode,
637 manifest.metadata.region_id,
638 );
639 options.append_mode = manifest_append_mode;
640 }
641 if options.append_mode && options.merge_mode.take().is_some() {
642 common_telemetry::warn!(
643 "Ignoring merge_mode because append_mode is enabled for region {}",
644 manifest.metadata.region_id,
645 );
646 }
647}
648
649pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
651 let append_mode_enabled = options
652 .get("append_mode")
653 .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
654
655 if append_mode_enabled && options.remove("merge_mode").is_some() {
656 common_telemetry::warn!(
657 "Ignoring merge_mode in open request options because append_mode is enabled"
658 );
659 }
660}
661
662pub fn get_object_store(
664 name: &Option<String>,
665 object_store_manager: &ObjectStoreManagerRef,
666) -> Result<object_store::ObjectStore> {
667 if let Some(name) = name {
668 Ok(object_store_manager
669 .find(name)
670 .with_context(|| ObjectStoreNotFoundSnafu {
671 object_store: name.clone(),
672 })?
673 .clone())
674 } else {
675 Ok(object_store_manager.default_object_store().clone())
676 }
677}
678
679pub(crate) fn check_recovered_region(
681 recovered: &RegionMetadata,
682 region_id: RegionId,
683 column_metadatas: &[ColumnMetadata],
684 primary_key: &[ColumnId],
685) -> Result<()> {
686 if recovered.region_id != region_id {
687 error!(
688 "Recovered region {}, expect region {}",
689 recovered.region_id, region_id
690 );
691 return RegionCorruptedSnafu {
692 region_id,
693 reason: format!(
694 "recovered metadata has different region id {}",
695 recovered.region_id
696 ),
697 }
698 .fail();
699 }
700 if recovered.column_metadatas != column_metadatas {
701 error!(
702 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
703 recovered.region_id, recovered.column_metadatas, column_metadatas
704 );
705
706 return RegionCorruptedSnafu {
707 region_id,
708 reason: "recovered metadata has different schema",
709 }
710 .fail();
711 }
712 if recovered.primary_key != primary_key {
713 error!(
714 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
715 recovered.region_id, recovered.primary_key, primary_key
716 );
717
718 return RegionCorruptedSnafu {
719 region_id,
720 reason: "recovered metadata has different primary key",
721 }
722 .fail();
723 }
724
725 Ok(())
726}
727
728pub(crate) async fn replay_memtable<F>(
730 provider: &Provider,
731 mut wal_entry_reader: Box<dyn WalEntryReader>,
732 region_id: RegionId,
733 flushed_entry_id: EntryId,
734 version_control: &VersionControlRef,
735 allow_stale_entries: bool,
736 on_region_opened: F,
737) -> Result<EntryId>
738where
739 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
740{
741 let now = Instant::now();
742 let mut rows_replayed = 0;
743 let mut last_entry_id = flushed_entry_id;
746 let replay_from_entry_id = flushed_entry_id + 1;
747
748 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
749 while let Some(res) = wal_stream.next().await {
750 let (entry_id, entry) = res?;
751 if entry_id <= flushed_entry_id {
752 warn!(
753 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
754 region_id, flushed_entry_id, entry_id
755 );
756 ensure!(
757 allow_stale_entries,
758 StaleLogEntrySnafu {
759 region_id,
760 flushed_entry_id,
761 unexpected_entry_id: entry_id,
762 }
763 );
764 }
765 last_entry_id = last_entry_id.max(entry_id);
766
767 let mut region_write_ctx = RegionWriteCtx::new(
768 region_id,
769 version_control,
770 provider.clone(),
771 None,
773 );
774 for mutation in entry.mutations {
775 rows_replayed += mutation
776 .rows
777 .as_ref()
778 .map(|rows| rows.rows.len())
779 .unwrap_or(0);
780 region_write_ctx.push_mutation(
781 mutation.op_type,
782 mutation.rows,
783 mutation.write_hint,
784 OptionOutputTx::none(),
785 Some(mutation.sequence),
787 );
788 }
789
790 for bulk_entry in entry.bulk_entries {
791 let part = BulkPart::try_from(bulk_entry)?;
792 rows_replayed += part.num_rows();
793 let bulk_sequence_from_wal = part.sequence;
795 ensure!(
796 region_write_ctx.push_bulk(
797 OptionOutputTx::none(),
798 part,
799 Some(bulk_sequence_from_wal)
800 ),
801 RegionCorruptedSnafu {
802 region_id,
803 reason: "unable to replay memtable with bulk entries",
804 }
805 );
806 }
807
808 region_write_ctx.set_next_entry_id(last_entry_id + 1);
810 region_write_ctx.write_memtable().await;
811 region_write_ctx.write_bulk().await;
812 }
813
814 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
817
818 let series_count = version_control.current().series_count();
819 info!(
820 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
821 region_id,
822 provider,
823 rows_replayed,
824 replay_from_entry_id,
825 last_entry_id,
826 series_count,
827 now.elapsed()
828 );
829 Ok(last_entry_id)
830}
831
832pub(crate) struct RegionLoadCacheTask {
834 region: MitoRegionRef,
835}
836
837impl RegionLoadCacheTask {
838 pub(crate) fn new(region: MitoRegionRef) -> Self {
839 Self { region }
840 }
841
842 pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
844 let region_id = self.region.region_id;
845 let table_dir = self.region.access_layer.table_dir();
846 let path_type = self.region.access_layer.path_type();
847 let object_store = self.region.access_layer.object_store();
848 let version_control = &self.region.version_control;
849
850 let mut files_to_download = Vec::new();
852 let mut files_already_cached = 0;
853
854 {
855 let version = version_control.current().version;
856 for level in version.ssts.levels() {
857 for file_handle in level.files.values() {
858 let file_meta = file_handle.meta_ref();
859 if file_meta.exists_index() {
860 let puffin_key = IndexKey::new(
861 file_meta.region_id,
862 file_meta.file_id,
863 FileType::Puffin(file_meta.index_version),
864 );
865
866 if !file_cache.contains_key(&puffin_key) {
867 files_to_download.push((
868 puffin_key,
869 file_meta.index_file_size,
870 file_meta.time_range.1, ));
872 } else {
873 files_already_cached += 1;
874 }
875 }
876 }
877 }
878 }
881
882 files_to_download.sort_by_key(|b| std::cmp::Reverse(b.2));
884
885 let total_files = files_to_download.len() as i64;
886
887 info!(
888 "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
889 region_id, total_files, files_already_cached
890 );
891
892 CACHE_FILL_PENDING_FILES.add(total_files);
893
894 let mut files_downloaded = 0;
895 let mut files_skipped = 0;
896
897 for (puffin_key, file_size, max_timestamp) in files_to_download {
898 let current_size = file_cache.puffin_cache_size();
899 let capacity = file_cache.puffin_cache_capacity();
900 let region_state = self.region.state();
901 if !can_load_cache(region_state) {
902 info!(
903 "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
904 region_state, region_id, current_size, capacity
905 );
906 break;
907 }
908
909 if current_size + file_size > capacity {
911 info!(
912 "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
913 region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
914 );
915 files_skipped = (total_files - files_downloaded) as usize;
916 CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
917 break;
918 }
919
920 let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
921 version
922 } else {
923 unreachable!("`files_to_download` should only contains Puffin files");
924 };
925 let index_id = RegionIndexId::new(
926 RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
927 index_version,
928 );
929
930 let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
931
932 match file_cache
933 .download(puffin_key, &index_remote_path, object_store, file_size)
934 .await
935 {
936 Ok(_) => {
937 debug!(
938 "Downloaded index file to write cache, region: {}, file_id: {}",
939 region_id, puffin_key.file_id
940 );
941 files_downloaded += 1;
942 CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
943 CACHE_FILL_PENDING_FILES.dec();
944 }
945 Err(e) => {
946 warn!(
947 e; "Failed to download index file to write cache, region: {}, file_id: {}",
948 region_id, puffin_key.file_id
949 );
950 CACHE_FILL_PENDING_FILES.dec();
951 }
952 }
953 }
954
955 info!(
956 "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
957 region_id, total_files, files_downloaded, files_already_cached, files_skipped
958 );
959 }
960}
961
962fn maybe_load_cache(
964 region: &MitoRegionRef,
965 config: &MitoConfig,
966 cache_manager: &Option<CacheManagerRef>,
967) {
968 let Some(cache_manager) = cache_manager else {
969 return;
970 };
971 let Some(write_cache) = cache_manager.write_cache() else {
972 return;
973 };
974
975 let preload_enabled = config.preload_index_cache;
976 if !preload_enabled {
977 return;
978 }
979
980 let task = RegionLoadCacheTask::new(region.clone());
981 write_cache.load_region_cache(task);
982}
983
984#[allow(clippy::too_many_arguments)]
995async fn preload_parquet_meta_cache_for_files(
996 region_id: RegionId,
997 cache_manager: CacheManagerRef,
998 sst_meta_cache_capacity: u64,
999 table_dir: String,
1000 path_type: PathType,
1001 object_store: object_store::ObjectStore,
1002 region_metadata: RegionMetadataRef,
1003 mut files: Vec<FileHandle>,
1004) -> usize {
1005 if !cache_manager.sst_meta_cache_enabled()
1006 || sst_meta_cache_capacity == 0
1007 || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1008 {
1009 return 0;
1010 }
1011
1012 let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
1013
1014 files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
1016
1017 let mut loaded = 0usize;
1018 for file_handle in files {
1019 if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1021 break;
1022 }
1023
1024 let file_id = file_handle.file_id();
1025 let mut cache_metrics = MetadataCacheMetrics::default();
1026 if let Some(metadata) = cache_manager
1027 .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
1028 .await
1029 {
1030 if file_handle.primary_key_range().is_none()
1031 && let Some(primary_key_range) =
1032 extract_primary_key_range(&metadata, ®ion_metadata)
1033 {
1034 file_handle.set_primary_key_range(primary_key_range);
1035 }
1036 if cache_metrics.mem_cache_hit == 0 {
1038 loaded += 1;
1039 }
1040 continue;
1041 }
1042
1043 if !allow_direct_load {
1044 continue;
1045 }
1046
1047 let file_size = file_handle.meta_ref().file_size;
1048 let file_path = file_handle.file_path(&table_dir, path_type);
1049 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1050 match loader.load(&mut cache_metrics).await {
1051 Ok(metadata) => {
1052 if let Some(primary_key_range) =
1053 extract_primary_key_range(&metadata, ®ion_metadata)
1054 {
1055 file_handle.set_primary_key_range(primary_key_range);
1056 }
1057 cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1058 loaded += 1;
1059 }
1060 Err(err) => {
1061 warn!(
1063 err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1064 region_id, file_path
1065 );
1066 }
1067 }
1068 }
1069
1070 loaded
1071}
1072
1073fn maybe_preload_parquet_meta_cache(
1074 region: &MitoRegionRef,
1075 config: &MitoConfig,
1076 cache_manager: &Option<CacheManagerRef>,
1077) {
1078 let Some(cache_manager) = cache_manager else {
1079 return;
1080 };
1081 if !cache_manager.sst_meta_cache_enabled() {
1082 return;
1083 }
1084
1085 if config.sst_meta_cache_size.as_bytes() == 0 {
1087 return;
1088 }
1089 if !config.preload_index_cache {
1090 return;
1091 }
1092
1093 let region = region.clone();
1094 let cache_manager = cache_manager.clone();
1095 let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1096
1097 tokio::spawn(async move {
1098 let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1100
1101 let region_id = region.region_id;
1102 let table_dir = region.access_layer.table_dir().to_string();
1103 let path_type = region.access_layer.path_type();
1104 let object_store = region.access_layer.object_store().clone();
1105 let region_metadata = region.version_control.current().version.metadata.clone();
1106
1107 let mut files = Vec::new();
1109 {
1110 let version = region.version_control.current().version;
1111 for level in version.ssts.levels() {
1112 for file_handle in level.files.values() {
1113 files.push(file_handle.clone());
1114 }
1115 }
1116 }
1117 let preloading_start = Instant::now();
1118 let loaded = preload_parquet_meta_cache_for_files(
1119 region_id,
1120 cache_manager,
1121 sst_meta_cache_capacity,
1122 table_dir,
1123 path_type,
1124 object_store,
1125 region_metadata,
1126 files,
1127 )
1128 .await;
1129 let preloading_cost = preloading_start.elapsed();
1130
1131 if loaded > 0 {
1132 info!(
1133 "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1134 region_id,
1135 loaded,
1136 preloading_cost.as_millis()
1137 );
1138 }
1139 });
1140}
1141
1142fn can_load_cache(state: RegionRoleState) -> bool {
1143 match state {
1144 RegionRoleState::Leader(RegionLeaderState::Writable)
1145 | RegionRoleState::Leader(RegionLeaderState::Staging)
1146 | RegionRoleState::Leader(RegionLeaderState::Altering)
1147 | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1148 | RegionRoleState::Leader(RegionLeaderState::Editing)
1149 | RegionRoleState::Follower => true,
1150 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1152 | RegionRoleState::Leader(RegionLeaderState::Dropping)
1153 | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1154 }
1155}
1156
1157#[cfg(test)]
1158mod tests {
1159 use std::sync::Arc;
1160
1161 use common_base::readable_size::ReadableSize;
1162 use common_test_util::temp_dir::create_temp_dir;
1163 use common_time::Timestamp;
1164 use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
1165 use datatypes::arrow::record_batch::RecordBatch;
1166 use object_store::ObjectStore;
1167 use object_store::services::{Fs, Memory};
1168 use parquet::arrow::ArrowWriter;
1169 use parquet::file::metadata::KeyValue;
1170 use parquet::file::properties::WriterProperties;
1171 use store_api::region_request::PathType;
1172 use store_api::storage::{FileId, RegionId};
1173
1174 use super::preload_parquet_meta_cache_for_files;
1175 use crate::cache::CacheManager;
1176 use crate::cache::file_cache::{FileType, IndexKey};
1177 use crate::sst::file::{FileHandle, FileMeta};
1178 use crate::sst::file_purger::NoopFilePurger;
1179 use crate::sst::parquet::PARQUET_METADATA_KEY;
1180 use crate::test_util::TestEnv;
1181 use crate::test_util::sst_util::sst_region_metadata;
1182
1183 fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1184 let key_value_meta = KeyValue::new(
1185 PARQUET_METADATA_KEY.to_string(),
1186 sst_region_metadata().to_json().unwrap(),
1187 );
1188 let props = WriterProperties::builder()
1189 .set_key_value_metadata(Some(vec![key_value_meta]))
1190 .build();
1191
1192 let mut parquet_bytes = Vec::new();
1193 let mut writer =
1194 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1195 writer.write(batch).unwrap();
1196 writer.close().unwrap();
1197
1198 parquet_bytes
1199 }
1200
1201 #[tokio::test]
1202 async fn test_preload_parquet_meta_cache_uses_file_cache() {
1203 let env = TestEnv::new().await;
1204
1205 let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1206 let write_cache = env
1207 .create_write_cache(local_store, ReadableSize::mb(1024))
1208 .await;
1209 let cache_manager = Arc::new(
1210 CacheManager::builder()
1211 .sst_meta_cache_size(1024 * 1024)
1212 .write_cache(Some(write_cache.clone()))
1213 .build(),
1214 );
1215
1216 let region_id = RegionId::new(1, 1);
1217 let file_id = FileId::random();
1218
1219 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1220 let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
1221 let batch = RecordBatch::try_from_iter([
1222 ("col", col),
1223 (
1224 store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
1225 primary_key,
1226 ),
1227 ])
1228 .unwrap();
1229 let parquet_bytes = sst_parquet_bytes(&batch);
1230 let file_size = parquet_bytes.len() as u64;
1231
1232 let file_meta = FileMeta {
1233 region_id,
1234 file_id,
1235 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1236 level: 0,
1237 file_size,
1238 max_row_group_uncompressed_size: 0,
1239 available_indexes: Default::default(),
1240 indexes: vec![],
1241 index_file_size: 0,
1242 index_version: 0,
1243 num_rows: 3,
1244 num_row_groups: 1,
1245 sequence: None,
1246 partition_expr: None,
1247 num_series: 0,
1248 ..Default::default()
1249 };
1250 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1251
1252 let table_dir = "test_table";
1253 let path_type = PathType::Bare;
1254 let remote_path = file_handle.file_path(table_dir, path_type);
1255
1256 let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1257 source_store
1258 .write(&remote_path, parquet_bytes)
1259 .await
1260 .unwrap();
1261
1262 let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1264 write_cache
1265 .file_cache()
1266 .download(index_key, &remote_path, &source_store, file_size)
1267 .await
1268 .unwrap();
1269
1270 let region_file_id = file_handle.file_id();
1271 assert!(
1272 cache_manager
1273 .get_parquet_meta_data_from_mem_cache(region_file_id)
1274 .is_none()
1275 );
1276
1277 let loaded = preload_parquet_meta_cache_for_files(
1278 region_id,
1279 cache_manager.clone(),
1280 1024 * 1024,
1281 table_dir.to_string(),
1282 path_type,
1283 source_store.clone(),
1284 Arc::new(sst_region_metadata()),
1285 vec![file_handle.clone()],
1286 )
1287 .await;
1288
1289 assert_eq!(loaded, 1);
1291 assert!(
1292 cache_manager
1293 .get_parquet_meta_data_from_mem_cache(region_file_id)
1294 .is_some()
1295 );
1296 assert!(file_handle.primary_key_range().is_some());
1297 }
1298
1299 #[tokio::test]
1300 async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1301 let cache_manager = Arc::new(
1302 CacheManager::builder()
1303 .sst_meta_cache_size(1024 * 1024)
1304 .build(),
1305 );
1306
1307 let region_id = RegionId::new(1, 1);
1308 let file_id = FileId::random();
1309
1310 let file_meta = FileMeta {
1312 region_id,
1313 file_id,
1314 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1315 level: 0,
1316 file_size: 0,
1317 max_row_group_uncompressed_size: 0,
1318 available_indexes: Default::default(),
1319 indexes: vec![],
1320 index_file_size: 0,
1321 index_version: 0,
1322 num_rows: 3,
1323 num_row_groups: 1,
1324 sequence: None,
1325 partition_expr: None,
1326 num_series: 0,
1327 ..Default::default()
1328 };
1329 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1330
1331 let table_dir = "test_table";
1332 let path_type = PathType::Bare;
1333 let remote_path = file_handle.file_path(table_dir, path_type);
1334
1335 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1337 object_store
1338 .write(&remote_path, b"noop".as_slice())
1339 .await
1340 .unwrap();
1341
1342 let region_file_id = file_handle.file_id();
1343 assert!(
1344 cache_manager
1345 .get_parquet_meta_data_from_mem_cache(region_file_id)
1346 .is_none()
1347 );
1348
1349 let loaded = preload_parquet_meta_cache_for_files(
1350 region_id,
1351 cache_manager.clone(),
1352 1024 * 1024,
1353 table_dir.to_string(),
1354 path_type,
1355 object_store,
1356 Arc::new(sst_region_metadata()),
1357 vec![file_handle],
1358 )
1359 .await;
1360
1361 assert_eq!(loaded, 0);
1362 assert!(
1363 cache_manager
1364 .get_parquet_meta_data_from_mem_cache(region_file_id)
1365 .is_none()
1366 );
1367 }
1368
1369 #[tokio::test]
1370 async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1371 let cache_manager = Arc::new(
1372 CacheManager::builder()
1373 .sst_meta_cache_size(1024 * 1024)
1374 .build(),
1375 );
1376
1377 let region_id = RegionId::new(1, 1);
1378 let file_id = FileId::random();
1379
1380 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1381 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1382 let parquet_bytes = sst_parquet_bytes(&batch);
1383
1384 let file_meta = FileMeta {
1387 region_id,
1388 file_id,
1389 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1390 level: 0,
1391 file_size: 0,
1392 max_row_group_uncompressed_size: 0,
1393 available_indexes: Default::default(),
1394 indexes: vec![],
1395 index_file_size: 0,
1396 index_version: 0,
1397 num_rows: 3,
1398 num_row_groups: 1,
1399 sequence: None,
1400 partition_expr: None,
1401 num_series: 0,
1402 ..Default::default()
1403 };
1404 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1405
1406 let table_dir = "test_table";
1407 let path_type = PathType::Bare;
1408 let file_path = file_handle.file_path(table_dir, path_type);
1409
1410 let root = create_temp_dir("parquet-meta-preload");
1411 let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1412 .unwrap()
1413 .finish();
1414 object_store.write(&file_path, parquet_bytes).await.unwrap();
1415
1416 let region_file_id = file_handle.file_id();
1417 assert!(
1418 cache_manager
1419 .get_parquet_meta_data_from_mem_cache(region_file_id)
1420 .is_none()
1421 );
1422
1423 let loaded = preload_parquet_meta_cache_for_files(
1424 region_id,
1425 cache_manager.clone(),
1426 1024 * 1024,
1427 table_dir.to_string(),
1428 path_type,
1429 object_store,
1430 Arc::new(sst_region_metadata()),
1431 vec![file_handle],
1432 )
1433 .await;
1434
1435 assert_eq!(loaded, 1);
1436 assert!(
1437 cache_manager
1438 .get_parquet_meta_data_from_mem_cache(region_file_id)
1439 .is_some()
1440 );
1441 }
1442}