1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::manager::ObjectStoreManagerRef;
31use object_store::util::normalize_dir;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::logstore::LogStore;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::{
36 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
37};
38use store_api::region_engine::RegionRole;
39use store_api::region_request::PathType;
40use store_api::storage::{ColumnId, RegionId};
41use tokio::sync::Semaphore;
42
43use crate::access_layer::AccessLayer;
44use crate::cache::CacheManagerRef;
45use crate::cache::file_cache::{FileCache, FileType, IndexKey};
46use crate::config::MitoConfig;
47use crate::error;
48use crate::error::{
49 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
50 Result, StaleLogEntrySnafu,
51};
52use crate::manifest::action::RegionManifest;
53use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
54use crate::memtable::MemtableBuilderProvider;
55use crate::memtable::bulk::part::BulkPart;
56use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
57use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
58use crate::region::options::RegionOptions;
59use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
60use crate::region::{
61 ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
62};
63use crate::region_write_ctx::RegionWriteCtx;
64use crate::request::OptionOutputTx;
65use crate::schedule::scheduler::SchedulerRef;
66use crate::sst::FormatType;
67use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
68use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
69use crate::sst::file_ref::FileReferenceManagerRef;
70use crate::sst::index::intermediate::IntermediateManager;
71use crate::sst::index::puffin_manager::PuffinManagerFactory;
72use crate::sst::location::{self, region_dir_from_table_dir};
73use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
74use crate::sst::parquet::reader::MetadataCacheMetrics;
75use crate::time_provider::TimeProviderRef;
76use crate::wal::entry_reader::WalEntryReader;
77use crate::wal::{EntryId, Wal};
78
79const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
80
81static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
82 LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
83
84#[async_trait::async_trait]
90pub trait PartitionExprFetcher {
91 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
92}
93
94pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
95
96pub(crate) struct RegionOpener {
98 region_id: RegionId,
99 metadata_builder: Option<RegionMetadataBuilder>,
100 memtable_builder_provider: MemtableBuilderProvider,
101 object_store_manager: ObjectStoreManagerRef,
102 table_dir: String,
103 path_type: PathType,
104 purge_scheduler: SchedulerRef,
105 options: Option<RegionOptions>,
106 cache_manager: Option<CacheManagerRef>,
107 skip_wal_replay: bool,
108 puffin_manager_factory: PuffinManagerFactory,
109 intermediate_manager: IntermediateManager,
110 time_provider: TimeProviderRef,
111 stats: ManifestStats,
112 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
113 replay_checkpoint: Option<u64>,
114 file_ref_manager: FileReferenceManagerRef,
115 partition_expr_fetcher: PartitionExprFetcherRef,
116}
117
118impl RegionOpener {
119 #[allow(clippy::too_many_arguments)]
122 pub(crate) fn new(
123 region_id: RegionId,
124 table_dir: &str,
125 path_type: PathType,
126 memtable_builder_provider: MemtableBuilderProvider,
127 object_store_manager: ObjectStoreManagerRef,
128 purge_scheduler: SchedulerRef,
129 puffin_manager_factory: PuffinManagerFactory,
130 intermediate_manager: IntermediateManager,
131 time_provider: TimeProviderRef,
132 file_ref_manager: FileReferenceManagerRef,
133 partition_expr_fetcher: PartitionExprFetcherRef,
134 ) -> RegionOpener {
135 RegionOpener {
136 region_id,
137 metadata_builder: None,
138 memtable_builder_provider,
139 object_store_manager,
140 table_dir: normalize_dir(table_dir),
141 path_type,
142 purge_scheduler,
143 options: None,
144 cache_manager: None,
145 skip_wal_replay: false,
146 puffin_manager_factory,
147 intermediate_manager,
148 time_provider,
149 stats: Default::default(),
150 wal_entry_reader: None,
151 replay_checkpoint: None,
152 file_ref_manager,
153 partition_expr_fetcher,
154 }
155 }
156
157 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
159 self.metadata_builder = Some(builder);
160 self
161 }
162
163 fn region_dir(&self) -> String {
165 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
166 }
167
168 fn build_metadata(&mut self) -> Result<RegionMetadata> {
174 let options = self.options.as_ref().unwrap();
175 let mut metadata_builder = self.metadata_builder.take().unwrap();
176 metadata_builder.primary_key_encoding(options.primary_key_encoding());
177 metadata_builder.build().context(InvalidMetadataSnafu)
178 }
179
180 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
182 let region_id = self.region_id;
183 self.options(RegionOptions::try_from_options(region_id, &options)?)
184 }
185
186 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
188 self.replay_checkpoint = replay_checkpoint;
189 self
190 }
191
192 pub(crate) fn wal_entry_reader(
195 mut self,
196 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
197 ) -> Self {
198 self.wal_entry_reader = wal_entry_reader;
199 self
200 }
201
202 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
204 options.validate()?;
205 self.options = Some(options);
206 Ok(self)
207 }
208
209 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
211 self.cache_manager = cache_manager;
212 self
213 }
214
215 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
217 self.skip_wal_replay = skip;
218 self
219 }
220
221 pub(crate) async fn create_or_open<S: LogStore>(
228 mut self,
229 config: &MitoConfig,
230 wal: &Wal<S>,
231 ) -> Result<MitoRegionRef> {
232 let region_id = self.region_id;
233 let region_dir = self.region_dir();
234 let metadata = self.build_metadata()?;
235 match self.maybe_open(config, wal).await {
237 Ok(Some(region)) => {
238 let recovered = region.metadata();
239 let expect = &metadata;
241 check_recovered_region(
242 &recovered,
243 expect.region_id,
244 &expect.column_metadatas,
245 &expect.primary_key,
246 )?;
247 region.set_role(RegionRole::Leader);
249
250 return Ok(region);
251 }
252 Ok(None) => {
253 debug!(
254 "No data under directory {}, region_id: {}",
255 region_dir, self.region_id
256 );
257 }
258 Err(e) => {
259 warn!(e;
260 "Failed to open region {} before creating it, region_dir: {}",
261 self.region_id, region_dir
262 );
263 }
264 }
265 let mut options = self.options.take().unwrap();
267 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
268 let provider = self.provider::<S>(&options.wal_options)?;
269 let metadata = Arc::new(metadata);
270 let sst_format = if let Some(format) = options.sst_format {
272 format
273 } else if config.default_flat_format {
274 options.sst_format = Some(FormatType::Flat);
275 FormatType::Flat
276 } else {
277 options.sst_format = Some(FormatType::PrimaryKey);
279 FormatType::PrimaryKey
280 };
281 let mut region_manifest_options =
283 RegionManifestOptions::new(config, ®ion_dir, &object_store);
284 region_manifest_options.manifest_cache = self
286 .cache_manager
287 .as_ref()
288 .and_then(|cm| cm.write_cache())
289 .and_then(|wc| wc.manifest_cache());
290 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
292 let manifest_manager = RegionManifestManager::new(
293 metadata.clone(),
294 flushed_entry_id,
295 region_manifest_options,
296 sst_format,
297 &self.stats,
298 )
299 .await?;
300
301 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
302 let part_duration = options.compaction.time_window();
303 let mutable = Arc::new(TimePartitions::new(
305 metadata.clone(),
306 memtable_builder.clone(),
307 0,
308 part_duration,
309 ));
310
311 debug!(
312 "Create region {} with options: {:?}, default_flat_format: {}",
313 region_id, options, config.default_flat_format
314 );
315
316 let version = VersionBuilder::new(metadata, mutable)
317 .options(options)
318 .build();
319 let version_control = Arc::new(VersionControl::new(version));
320 let access_layer = Arc::new(AccessLayer::new(
321 self.table_dir.clone(),
322 self.path_type,
323 object_store,
324 self.puffin_manager_factory,
325 self.intermediate_manager,
326 ));
327 let now = self.time_provider.current_time_millis();
328
329 Ok(Arc::new(MitoRegion {
330 region_id,
331 version_control,
332 access_layer: access_layer.clone(),
333 manifest_ctx: Arc::new(ManifestContext::new(
335 manifest_manager,
336 RegionRoleState::Leader(RegionLeaderState::Writable),
337 )),
338 file_purger: create_file_purger(
339 config.gc.enable,
340 self.path_type,
341 self.purge_scheduler,
342 access_layer,
343 self.cache_manager,
344 self.file_ref_manager.clone(),
345 ),
346 provider,
347 last_flush_millis: AtomicI64::new(now),
348 last_schedule_compaction_millis: AtomicI64::new(now),
349 time_provider: self.time_provider.clone(),
350 topic_latest_entry_id: AtomicU64::new(0),
351 written_bytes: Arc::new(AtomicU64::new(0)),
352 stats: self.stats,
353 }))
354 }
355
356 pub(crate) async fn open<S: LogStore>(
360 mut self,
361 config: &MitoConfig,
362 wal: &Wal<S>,
363 ) -> Result<MitoRegionRef> {
364 let region_id = self.region_id;
365 let region_dir = self.region_dir();
366 let region = self
367 .maybe_open(config, wal)
368 .await?
369 .with_context(|| EmptyRegionDirSnafu {
370 region_id,
371 region_dir: ®ion_dir,
372 })?;
373
374 ensure!(
375 region.region_id == self.region_id,
376 RegionCorruptedSnafu {
377 region_id: self.region_id,
378 reason: format!(
379 "recovered region has different region id {}",
380 region.region_id
381 ),
382 }
383 );
384
385 Ok(region)
386 }
387
388 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
389 match wal_options {
390 WalOptions::RaftEngine => {
391 ensure!(
392 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
393 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
394 error::IncompatibleWalProviderChangeSnafu {
395 global: "`kafka`",
396 region: "`raft_engine`",
397 }
398 );
399 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
400 }
401 WalOptions::Kafka(options) => {
402 ensure!(
403 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
404 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
405 error::IncompatibleWalProviderChangeSnafu {
406 global: "`raft_engine`",
407 region: "`kafka`",
408 }
409 );
410 Ok(Provider::kafka_provider(options.topic.clone()))
411 }
412 WalOptions::Noop => Ok(Provider::noop_provider()),
413 }
414 }
415
416 async fn maybe_open<S: LogStore>(
418 &mut self,
419 config: &MitoConfig,
420 wal: &Wal<S>,
421 ) -> Result<Option<MitoRegionRef>> {
422 let now = Instant::now();
423 let mut region_options = self.options.as_ref().unwrap().clone();
424 let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?;
425 let mut region_manifest_options =
426 RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
427 region_manifest_options.manifest_cache = self
429 .cache_manager
430 .as_ref()
431 .and_then(|cm| cm.write_cache())
432 .and_then(|wc| wc.manifest_cache());
433 let Some(manifest_manager) =
434 RegionManifestManager::open(region_manifest_options, &self.stats).await?
435 else {
436 return Ok(None);
437 };
438
439 let manifest = manifest_manager.manifest();
441 let metadata = if manifest.metadata.partition_expr.is_none()
442 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
443 {
444 let metadata = manifest.metadata.as_ref().clone();
445 let mut builder = RegionMetadataBuilder::from_existing(metadata);
446 builder.partition_expr_json(Some(expr_json));
447 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
448 } else {
449 manifest.metadata.clone()
450 };
451 sanitize_region_options(&manifest, &mut region_options);
453
454 let region_id = self.region_id;
455 let provider = self.provider::<S>(®ion_options.wal_options)?;
456 let wal_entry_reader = self
457 .wal_entry_reader
458 .take()
459 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
460 let on_region_opened = wal.on_region_opened();
461 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
462
463 debug!(
464 "Open region {} at {} with options: {:?}",
465 region_id, self.table_dir, self.options
466 );
467
468 let access_layer = Arc::new(AccessLayer::new(
469 self.table_dir.clone(),
470 self.path_type,
471 object_store,
472 self.puffin_manager_factory.clone(),
473 self.intermediate_manager.clone(),
474 ));
475 let file_purger = create_file_purger(
476 config.gc.enable,
477 self.path_type,
478 self.purge_scheduler.clone(),
479 access_layer.clone(),
480 self.cache_manager.clone(),
481 self.file_ref_manager.clone(),
482 );
483 let memtable_builder = self
485 .memtable_builder_provider
486 .builder_for_options(®ion_options);
487 let part_duration = region_options
490 .compaction
491 .time_window()
492 .or(manifest.compaction_time_window);
493 let mutable = Arc::new(TimePartitions::new(
495 metadata.clone(),
496 memtable_builder.clone(),
497 0,
498 part_duration,
499 ));
500
501 let version_builder = version_builder_from_manifest(
503 &manifest,
504 metadata,
505 file_purger.clone(),
506 mutable,
507 region_options,
508 );
509 let version = version_builder.build();
510 let flushed_entry_id = version.flushed_entry_id;
511 let version_control = Arc::new(VersionControl::new(version));
512
513 let topic_latest_entry_id = if !self.skip_wal_replay {
514 let replay_from_entry_id = self
515 .replay_checkpoint
516 .unwrap_or_default()
517 .max(flushed_entry_id);
518 info!(
519 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
520 replay_from_entry_id,
521 region_id,
522 manifest.manifest_version,
523 flushed_entry_id,
524 now.elapsed()
525 );
526 replay_memtable(
527 &provider,
528 wal_entry_reader,
529 region_id,
530 replay_from_entry_id,
531 &version_control,
532 config.allow_stale_entries,
533 on_region_opened,
534 )
535 .await?;
536 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
540 wal.store().latest_entry_id(&provider).unwrap_or(0)
541 } else {
542 0
543 }
544 } else {
545 info!(
546 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
547 region_id,
548 manifest.manifest_version,
549 flushed_entry_id,
550 now.elapsed()
551 );
552
553 0
554 };
555
556 if let Some(committed_in_manifest) = manifest.committed_sequence {
557 let committed_after_replay = version_control.committed_sequence();
558 if committed_in_manifest > committed_after_replay {
559 info!(
560 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
561 self.region_id,
562 version_control.current().version.flushed_sequence,
563 version_control.committed_sequence(),
564 committed_in_manifest
565 );
566 version_control.set_committed_sequence(committed_in_manifest);
567 }
568 }
569
570 let now = self.time_provider.current_time_millis();
571
572 let region = MitoRegion {
573 region_id: self.region_id,
574 version_control: version_control.clone(),
575 access_layer: access_layer.clone(),
576 manifest_ctx: Arc::new(ManifestContext::new(
578 manifest_manager,
579 RegionRoleState::Follower,
580 )),
581 file_purger,
582 provider: provider.clone(),
583 last_flush_millis: AtomicI64::new(now),
584 last_schedule_compaction_millis: AtomicI64::new(now),
585 time_provider: self.time_provider.clone(),
586 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
587 written_bytes: Arc::new(AtomicU64::new(0)),
588 stats: self.stats.clone(),
589 };
590
591 let region = Arc::new(region);
592
593 maybe_load_cache(®ion, config, &self.cache_manager);
594 maybe_preload_parquet_meta_cache(®ion, config, &self.cache_manager);
595
596 Ok(Some(region))
597 }
598}
599
600pub(crate) fn version_builder_from_manifest(
602 manifest: &RegionManifest,
603 metadata: RegionMetadataRef,
604 file_purger: FilePurgerRef,
605 mutable: TimePartitionsRef,
606 region_options: RegionOptions,
607) -> VersionBuilder {
608 VersionBuilder::new(metadata, mutable)
609 .add_files(file_purger, manifest.files.values().cloned())
610 .flushed_entry_id(manifest.flushed_entry_id)
611 .flushed_sequence(manifest.flushed_sequence)
612 .truncated_entry_id(manifest.truncated_entry_id)
613 .compaction_time_window(manifest.compaction_time_window)
614 .options(region_options)
615}
616
617pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
619 match options.sst_format {
624 Some(format) if format != manifest.sst_format => {
625 common_telemetry::warn!(
626 "Overriding SST format from {:?} (manifest) to {:?} (options) for region {}",
627 manifest.sst_format,
628 format,
629 manifest.metadata.region_id,
630 );
631 }
632 Some(_) => {}
633 None => {
634 options.sst_format = Some(manifest.sst_format);
635 }
636 }
637 if let Some(manifest_append_mode) = manifest.append_mode
638 && options.append_mode != manifest_append_mode
639 {
640 common_telemetry::warn!(
641 "Overriding append_mode from {} to {} for region {}",
642 options.append_mode,
643 manifest_append_mode,
644 manifest.metadata.region_id,
645 );
646 options.append_mode = manifest_append_mode;
647 }
648 if options.append_mode && options.merge_mode.take().is_some() {
649 common_telemetry::warn!(
650 "Ignoring merge_mode because append_mode is enabled for region {}",
651 manifest.metadata.region_id,
652 );
653 }
654}
655
656pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
658 let append_mode_enabled = options
659 .get("append_mode")
660 .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
661
662 if append_mode_enabled && options.remove("merge_mode").is_some() {
663 common_telemetry::warn!(
664 "Ignoring merge_mode in open request options because append_mode is enabled"
665 );
666 }
667}
668
669pub fn get_object_store(
671 name: &Option<String>,
672 object_store_manager: &ObjectStoreManagerRef,
673) -> Result<object_store::ObjectStore> {
674 if let Some(name) = name {
675 Ok(object_store_manager
676 .find(name)
677 .with_context(|| ObjectStoreNotFoundSnafu {
678 object_store: name.clone(),
679 })?
680 .clone())
681 } else {
682 Ok(object_store_manager.default_object_store().clone())
683 }
684}
685
686pub(crate) fn check_recovered_region(
688 recovered: &RegionMetadata,
689 region_id: RegionId,
690 column_metadatas: &[ColumnMetadata],
691 primary_key: &[ColumnId],
692) -> Result<()> {
693 if recovered.region_id != region_id {
694 error!(
695 "Recovered region {}, expect region {}",
696 recovered.region_id, region_id
697 );
698 return RegionCorruptedSnafu {
699 region_id,
700 reason: format!(
701 "recovered metadata has different region id {}",
702 recovered.region_id
703 ),
704 }
705 .fail();
706 }
707 if recovered.column_metadatas != column_metadatas {
708 error!(
709 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
710 recovered.region_id, recovered.column_metadatas, column_metadatas
711 );
712
713 return RegionCorruptedSnafu {
714 region_id,
715 reason: "recovered metadata has different schema",
716 }
717 .fail();
718 }
719 if recovered.primary_key != primary_key {
720 error!(
721 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
722 recovered.region_id, recovered.primary_key, primary_key
723 );
724
725 return RegionCorruptedSnafu {
726 region_id,
727 reason: "recovered metadata has different primary key",
728 }
729 .fail();
730 }
731
732 Ok(())
733}
734
735pub(crate) async fn replay_memtable<F>(
737 provider: &Provider,
738 mut wal_entry_reader: Box<dyn WalEntryReader>,
739 region_id: RegionId,
740 flushed_entry_id: EntryId,
741 version_control: &VersionControlRef,
742 allow_stale_entries: bool,
743 on_region_opened: F,
744) -> Result<EntryId>
745where
746 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
747{
748 let now = Instant::now();
749 let mut rows_replayed = 0;
750 let mut last_entry_id = flushed_entry_id;
753 let replay_from_entry_id = flushed_entry_id + 1;
754
755 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
756 while let Some(res) = wal_stream.next().await {
757 let (entry_id, entry) = res?;
758 if entry_id <= flushed_entry_id {
759 warn!(
760 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
761 region_id, flushed_entry_id, entry_id
762 );
763 ensure!(
764 allow_stale_entries,
765 StaleLogEntrySnafu {
766 region_id,
767 flushed_entry_id,
768 unexpected_entry_id: entry_id,
769 }
770 );
771 }
772 last_entry_id = last_entry_id.max(entry_id);
773
774 let mut region_write_ctx = RegionWriteCtx::new(
775 region_id,
776 version_control,
777 provider.clone(),
778 None,
780 );
781 for mutation in entry.mutations {
782 rows_replayed += mutation
783 .rows
784 .as_ref()
785 .map(|rows| rows.rows.len())
786 .unwrap_or(0);
787 region_write_ctx.push_mutation(
788 mutation.op_type,
789 mutation.rows,
790 mutation.write_hint,
791 OptionOutputTx::none(),
792 Some(mutation.sequence),
794 );
795 }
796
797 for bulk_entry in entry.bulk_entries {
798 let part = BulkPart::try_from(bulk_entry)?;
799 rows_replayed += part.num_rows();
800 let bulk_sequence_from_wal = part.sequence;
802 ensure!(
803 region_write_ctx.push_bulk(
804 OptionOutputTx::none(),
805 part,
806 Some(bulk_sequence_from_wal)
807 ),
808 RegionCorruptedSnafu {
809 region_id,
810 reason: "unable to replay memtable with bulk entries",
811 }
812 );
813 }
814
815 region_write_ctx.set_next_entry_id(last_entry_id + 1);
817 region_write_ctx.write_memtable().await;
818 region_write_ctx.write_bulk().await;
819 }
820
821 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
824
825 let series_count = version_control.current().series_count();
826 info!(
827 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
828 region_id,
829 provider,
830 rows_replayed,
831 replay_from_entry_id,
832 last_entry_id,
833 series_count,
834 now.elapsed()
835 );
836 Ok(last_entry_id)
837}
838
839pub(crate) struct RegionLoadCacheTask {
841 region: MitoRegionRef,
842}
843
844impl RegionLoadCacheTask {
845 pub(crate) fn new(region: MitoRegionRef) -> Self {
846 Self { region }
847 }
848
849 pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
851 let region_id = self.region.region_id;
852 let table_dir = self.region.access_layer.table_dir();
853 let path_type = self.region.access_layer.path_type();
854 let object_store = self.region.access_layer.object_store();
855 let version_control = &self.region.version_control;
856
857 let mut files_to_download = Vec::new();
859 let mut files_already_cached = 0;
860
861 {
862 let version = version_control.current().version;
863 for level in version.ssts.levels() {
864 for file_handle in level.files.values() {
865 let file_meta = file_handle.meta_ref();
866 if file_meta.exists_index() {
867 let puffin_key = IndexKey::new(
868 file_meta.region_id,
869 file_meta.file_id,
870 FileType::Puffin(file_meta.index_version),
871 );
872
873 if !file_cache.contains_key(&puffin_key) {
874 files_to_download.push((
875 puffin_key,
876 file_meta.index_file_size,
877 file_meta.time_range.1, ));
879 } else {
880 files_already_cached += 1;
881 }
882 }
883 }
884 }
885 }
888
889 files_to_download.sort_by_key(|b| std::cmp::Reverse(b.2));
891
892 let total_files = files_to_download.len() as i64;
893
894 info!(
895 "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
896 region_id, total_files, files_already_cached
897 );
898
899 CACHE_FILL_PENDING_FILES.add(total_files);
900
901 let mut files_downloaded = 0;
902 let mut files_skipped = 0;
903
904 for (puffin_key, file_size, max_timestamp) in files_to_download {
905 let current_size = file_cache.puffin_cache_size();
906 let capacity = file_cache.puffin_cache_capacity();
907 let region_state = self.region.state();
908 if !can_load_cache(region_state) {
909 info!(
910 "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
911 region_state, region_id, current_size, capacity
912 );
913 break;
914 }
915
916 if current_size + file_size > capacity {
918 info!(
919 "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
920 region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
921 );
922 files_skipped = (total_files - files_downloaded) as usize;
923 CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
924 break;
925 }
926
927 let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
928 version
929 } else {
930 unreachable!("`files_to_download` should only contains Puffin files");
931 };
932 let index_id = RegionIndexId::new(
933 RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
934 index_version,
935 );
936
937 let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
938
939 match file_cache
940 .download(puffin_key, &index_remote_path, object_store, file_size)
941 .await
942 {
943 Ok(_) => {
944 debug!(
945 "Downloaded index file to write cache, region: {}, file_id: {}",
946 region_id, puffin_key.file_id
947 );
948 files_downloaded += 1;
949 CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
950 CACHE_FILL_PENDING_FILES.dec();
951 }
952 Err(e) => {
953 warn!(
954 e; "Failed to download index file to write cache, region: {}, file_id: {}",
955 region_id, puffin_key.file_id
956 );
957 CACHE_FILL_PENDING_FILES.dec();
958 }
959 }
960 }
961
962 info!(
963 "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
964 region_id, total_files, files_downloaded, files_already_cached, files_skipped
965 );
966 }
967}
968
969fn maybe_load_cache(
971 region: &MitoRegionRef,
972 config: &MitoConfig,
973 cache_manager: &Option<CacheManagerRef>,
974) {
975 let Some(cache_manager) = cache_manager else {
976 return;
977 };
978 let Some(write_cache) = cache_manager.write_cache() else {
979 return;
980 };
981
982 let preload_enabled = config.preload_index_cache;
983 if !preload_enabled {
984 return;
985 }
986
987 let task = RegionLoadCacheTask::new(region.clone());
988 write_cache.load_region_cache(task);
989}
990
991#[allow(clippy::too_many_arguments)]
1002async fn preload_parquet_meta_cache_for_files(
1003 region_id: RegionId,
1004 cache_manager: CacheManagerRef,
1005 sst_meta_cache_capacity: u64,
1006 table_dir: String,
1007 path_type: PathType,
1008 object_store: object_store::ObjectStore,
1009 region_metadata: RegionMetadataRef,
1010 mut files: Vec<FileHandle>,
1011) -> usize {
1012 if !cache_manager.sst_meta_cache_enabled()
1013 || sst_meta_cache_capacity == 0
1014 || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1015 {
1016 return 0;
1017 }
1018
1019 let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME;
1020
1021 files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
1023
1024 let mut loaded = 0usize;
1025 for file_handle in files {
1026 if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1028 break;
1029 }
1030
1031 let file_id = file_handle.file_id();
1032 let mut cache_metrics = MetadataCacheMetrics::default();
1033 if let Some(metadata) = cache_manager
1034 .get_parquet_meta_data(file_id, &mut cache_metrics, Default::default())
1035 .await
1036 {
1037 if file_handle.primary_key_range().is_none()
1038 && let Some(primary_key_range) =
1039 extract_primary_key_range(&metadata, ®ion_metadata)
1040 {
1041 file_handle.set_primary_key_range(primary_key_range);
1042 }
1043 if cache_metrics.mem_cache_hit == 0 {
1045 loaded += 1;
1046 }
1047 continue;
1048 }
1049
1050 if !allow_direct_load {
1051 continue;
1052 }
1053
1054 let file_size = file_handle.meta_ref().file_size;
1055 let file_path = file_handle.file_path(&table_dir, path_type);
1056 let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1057 match loader.load(&mut cache_metrics).await {
1058 Ok(metadata) => {
1059 if let Some(primary_key_range) =
1060 extract_primary_key_range(&metadata, ®ion_metadata)
1061 {
1062 file_handle.set_primary_key_range(primary_key_range);
1063 }
1064 cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1065 loaded += 1;
1066 }
1067 Err(err) => {
1068 warn!(
1070 err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1071 region_id, file_path
1072 );
1073 }
1074 }
1075 }
1076
1077 loaded
1078}
1079
1080fn maybe_preload_parquet_meta_cache(
1081 region: &MitoRegionRef,
1082 config: &MitoConfig,
1083 cache_manager: &Option<CacheManagerRef>,
1084) {
1085 let Some(cache_manager) = cache_manager else {
1086 return;
1087 };
1088 if !cache_manager.sst_meta_cache_enabled() {
1089 return;
1090 }
1091
1092 if config.sst_meta_cache_size.as_bytes() == 0 {
1094 return;
1095 }
1096 if !config.preload_index_cache {
1097 return;
1098 }
1099
1100 let region = region.clone();
1101 let cache_manager = cache_manager.clone();
1102 let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1103
1104 tokio::spawn(async move {
1105 let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1107
1108 let region_id = region.region_id;
1109 let table_dir = region.access_layer.table_dir().to_string();
1110 let path_type = region.access_layer.path_type();
1111 let object_store = region.access_layer.object_store().clone();
1112 let region_metadata = region.version_control.current().version.metadata.clone();
1113
1114 let mut files = Vec::new();
1116 {
1117 let version = region.version_control.current().version;
1118 for level in version.ssts.levels() {
1119 for file_handle in level.files.values() {
1120 files.push(file_handle.clone());
1121 }
1122 }
1123 }
1124 let preloading_start = Instant::now();
1125 let loaded = preload_parquet_meta_cache_for_files(
1126 region_id,
1127 cache_manager,
1128 sst_meta_cache_capacity,
1129 table_dir,
1130 path_type,
1131 object_store,
1132 region_metadata,
1133 files,
1134 )
1135 .await;
1136 let preloading_cost = preloading_start.elapsed();
1137
1138 if loaded > 0 {
1139 info!(
1140 "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1141 region_id,
1142 loaded,
1143 preloading_cost.as_millis()
1144 );
1145 }
1146 });
1147}
1148
1149fn can_load_cache(state: RegionRoleState) -> bool {
1150 match state {
1151 RegionRoleState::Leader(RegionLeaderState::Writable)
1152 | RegionRoleState::Leader(RegionLeaderState::Staging)
1153 | RegionRoleState::Leader(RegionLeaderState::Altering)
1154 | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1155 | RegionRoleState::Leader(RegionLeaderState::Editing)
1156 | RegionRoleState::Follower => true,
1157 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1159 | RegionRoleState::Leader(RegionLeaderState::Dropping)
1160 | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1161 }
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166 use std::collections::HashMap;
1167 use std::sync::Arc;
1168
1169 use common_base::readable_size::ReadableSize;
1170 use common_test_util::temp_dir::create_temp_dir;
1171 use common_time::Timestamp;
1172 use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
1173 use datatypes::arrow::record_batch::RecordBatch;
1174 use object_store::ObjectStore;
1175 use object_store::services::{Fs, Memory};
1176 use parquet::arrow::ArrowWriter;
1177 use parquet::file::metadata::KeyValue;
1178 use parquet::file::properties::WriterProperties;
1179 use store_api::region_request::PathType;
1180 use store_api::storage::{FileId, RegionId};
1181
1182 use super::{preload_parquet_meta_cache_for_files, sanitize_region_options};
1183 use crate::cache::CacheManager;
1184 use crate::cache::file_cache::{FileType, IndexKey};
1185 use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
1186 use crate::region::options::RegionOptions;
1187 use crate::sst::FormatType;
1188 use crate::sst::file::{FileHandle, FileMeta};
1189 use crate::sst::file_purger::NoopFilePurger;
1190 use crate::sst::parquet::PARQUET_METADATA_KEY;
1191 use crate::test_util::TestEnv;
1192 use crate::test_util::sst_util::sst_region_metadata;
1193
1194 fn build_test_manifest(sst_format: FormatType) -> RegionManifest {
1195 RegionManifest {
1196 metadata: Arc::new(sst_region_metadata()),
1197 files: HashMap::new(),
1198 removed_files: RemovedFilesRecord::default(),
1199 flushed_entry_id: 0,
1200 flushed_sequence: 0,
1201 committed_sequence: None,
1202 manifest_version: 0,
1203 truncated_entry_id: None,
1204 compaction_time_window: None,
1205 sst_format,
1206 append_mode: None,
1207 }
1208 }
1209
1210 #[test]
1211 fn test_sanitize_region_options_options_format_wins() {
1212 let manifest = build_test_manifest(FormatType::PrimaryKey);
1215 let mut options = RegionOptions {
1216 sst_format: Some(FormatType::Flat),
1217 ..Default::default()
1218 };
1219 sanitize_region_options(&manifest, &mut options);
1220 assert_eq!(options.sst_format, Some(FormatType::Flat));
1221 }
1222
1223 #[test]
1224 fn test_sanitize_region_options_fills_from_manifest_when_unset() {
1225 let manifest = build_test_manifest(FormatType::Flat);
1228 let mut options = RegionOptions {
1229 sst_format: None,
1230 ..Default::default()
1231 };
1232 sanitize_region_options(&manifest, &mut options);
1233 assert_eq!(options.sst_format, Some(FormatType::Flat));
1234 }
1235
1236 fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1237 let key_value_meta = KeyValue::new(
1238 PARQUET_METADATA_KEY.to_string(),
1239 sst_region_metadata().to_json().unwrap(),
1240 );
1241 let props = WriterProperties::builder()
1242 .set_key_value_metadata(Some(vec![key_value_meta]))
1243 .build();
1244
1245 let mut parquet_bytes = Vec::new();
1246 let mut writer =
1247 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1248 writer.write(batch).unwrap();
1249 writer.close().unwrap();
1250
1251 parquet_bytes
1252 }
1253
1254 #[tokio::test]
1255 async fn test_preload_parquet_meta_cache_uses_file_cache() {
1256 let env = TestEnv::new().await;
1257
1258 let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1259 let write_cache = env
1260 .create_write_cache(local_store, ReadableSize::mb(1024))
1261 .await;
1262 let cache_manager = Arc::new(
1263 CacheManager::builder()
1264 .sst_meta_cache_size(1024 * 1024)
1265 .write_cache(Some(write_cache.clone()))
1266 .build(),
1267 );
1268
1269 let region_id = RegionId::new(1, 1);
1270 let file_id = FileId::random();
1271
1272 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1273 let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
1274 let batch = RecordBatch::try_from_iter([
1275 ("col", col),
1276 (
1277 store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
1278 primary_key,
1279 ),
1280 ])
1281 .unwrap();
1282 let parquet_bytes = sst_parquet_bytes(&batch);
1283 let file_size = parquet_bytes.len() as u64;
1284
1285 let file_meta = FileMeta {
1286 region_id,
1287 file_id,
1288 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1289 level: 0,
1290 file_size,
1291 max_row_group_uncompressed_size: 0,
1292 available_indexes: Default::default(),
1293 indexes: vec![],
1294 index_file_size: 0,
1295 index_version: 0,
1296 num_rows: 3,
1297 num_row_groups: 1,
1298 sequence: None,
1299 partition_expr: None,
1300 num_series: 0,
1301 ..Default::default()
1302 };
1303 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1304
1305 let table_dir = "test_table";
1306 let path_type = PathType::Bare;
1307 let remote_path = file_handle.file_path(table_dir, path_type);
1308
1309 let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1310 source_store
1311 .write(&remote_path, parquet_bytes)
1312 .await
1313 .unwrap();
1314
1315 let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1317 write_cache
1318 .file_cache()
1319 .download(index_key, &remote_path, &source_store, file_size)
1320 .await
1321 .unwrap();
1322
1323 let region_file_id = file_handle.file_id();
1324 assert!(
1325 cache_manager
1326 .get_parquet_meta_data_from_mem_cache(region_file_id)
1327 .is_none()
1328 );
1329
1330 let loaded = preload_parquet_meta_cache_for_files(
1331 region_id,
1332 cache_manager.clone(),
1333 1024 * 1024,
1334 table_dir.to_string(),
1335 path_type,
1336 source_store.clone(),
1337 Arc::new(sst_region_metadata()),
1338 vec![file_handle.clone()],
1339 )
1340 .await;
1341
1342 assert_eq!(loaded, 1);
1344 assert!(
1345 cache_manager
1346 .get_parquet_meta_data_from_mem_cache(region_file_id)
1347 .is_some()
1348 );
1349 assert!(file_handle.primary_key_range().is_some());
1350 }
1351
1352 #[tokio::test]
1353 async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1354 let cache_manager = Arc::new(
1355 CacheManager::builder()
1356 .sst_meta_cache_size(1024 * 1024)
1357 .build(),
1358 );
1359
1360 let region_id = RegionId::new(1, 1);
1361 let file_id = FileId::random();
1362
1363 let file_meta = FileMeta {
1365 region_id,
1366 file_id,
1367 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1368 level: 0,
1369 file_size: 0,
1370 max_row_group_uncompressed_size: 0,
1371 available_indexes: Default::default(),
1372 indexes: vec![],
1373 index_file_size: 0,
1374 index_version: 0,
1375 num_rows: 3,
1376 num_row_groups: 1,
1377 sequence: None,
1378 partition_expr: None,
1379 num_series: 0,
1380 ..Default::default()
1381 };
1382 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1383
1384 let table_dir = "test_table";
1385 let path_type = PathType::Bare;
1386 let remote_path = file_handle.file_path(table_dir, path_type);
1387
1388 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1390 object_store
1391 .write(&remote_path, b"noop".as_slice())
1392 .await
1393 .unwrap();
1394
1395 let region_file_id = file_handle.file_id();
1396 assert!(
1397 cache_manager
1398 .get_parquet_meta_data_from_mem_cache(region_file_id)
1399 .is_none()
1400 );
1401
1402 let loaded = preload_parquet_meta_cache_for_files(
1403 region_id,
1404 cache_manager.clone(),
1405 1024 * 1024,
1406 table_dir.to_string(),
1407 path_type,
1408 object_store,
1409 Arc::new(sst_region_metadata()),
1410 vec![file_handle],
1411 )
1412 .await;
1413
1414 assert_eq!(loaded, 0);
1415 assert!(
1416 cache_manager
1417 .get_parquet_meta_data_from_mem_cache(region_file_id)
1418 .is_none()
1419 );
1420 }
1421
1422 #[tokio::test]
1423 async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1424 let cache_manager = Arc::new(
1425 CacheManager::builder()
1426 .sst_meta_cache_size(1024 * 1024)
1427 .build(),
1428 );
1429
1430 let region_id = RegionId::new(1, 1);
1431 let file_id = FileId::random();
1432
1433 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1434 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1435 let parquet_bytes = sst_parquet_bytes(&batch);
1436
1437 let file_meta = FileMeta {
1440 region_id,
1441 file_id,
1442 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1443 level: 0,
1444 file_size: 0,
1445 max_row_group_uncompressed_size: 0,
1446 available_indexes: Default::default(),
1447 indexes: vec![],
1448 index_file_size: 0,
1449 index_version: 0,
1450 num_rows: 3,
1451 num_row_groups: 1,
1452 sequence: None,
1453 partition_expr: None,
1454 num_series: 0,
1455 ..Default::default()
1456 };
1457 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1458
1459 let table_dir = "test_table";
1460 let path_type = PathType::Bare;
1461 let file_path = file_handle.file_path(table_dir, path_type);
1462
1463 let root = create_temp_dir("parquet-meta-preload");
1464 let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1465 .unwrap()
1466 .finish();
1467 object_store.write(&file_path, parquet_bytes).await.unwrap();
1468
1469 let region_file_id = file_handle.file_id();
1470 assert!(
1471 cache_manager
1472 .get_parquet_meta_data_from_mem_cache(region_file_id)
1473 .is_none()
1474 );
1475
1476 let loaded = preload_parquet_meta_cache_for_files(
1477 region_id,
1478 cache_manager.clone(),
1479 1024 * 1024,
1480 table_dir.to_string(),
1481 path_type,
1482 object_store,
1483 Arc::new(sst_region_metadata()),
1484 vec![file_handle],
1485 )
1486 .await;
1487
1488 assert_eq!(loaded, 1);
1489 assert!(
1490 cache_manager
1491 .get_parquet_meta_data_from_mem_cache(region_file_id)
1492 .is_some()
1493 );
1494 }
1495}