1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::{Arc, LazyLock};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::noop::log_store::NoopLogStore;
29use log_store::raft_engine::log_store::RaftEngineLogStore;
30use object_store::ObjectStore;
31use object_store::manager::ObjectStoreManagerRef;
32use object_store::util::{is_object_storage, normalize_dir};
33use parquet::file::metadata::PageIndexPolicy;
34use snafu::{OptionExt, ResultExt, ensure};
35use store_api::logstore::LogStore;
36use store_api::logstore::provider::Provider;
37use store_api::metadata::{
38 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
39};
40use store_api::region_engine::RegionRole;
41use store_api::region_request::{PathType, RegionRequirements};
42use store_api::storage::{ColumnId, RegionId};
43use tokio::sync::Semaphore;
44
45use crate::access_layer::AccessLayer;
46use crate::cache::CacheManagerRef;
47use crate::cache::file_cache::{FileCache, FileType, IndexKey};
48use crate::config::MitoConfig;
49use crate::engine::region_hook::RegionHookRef;
50use crate::error;
51use crate::error::{
52 EmptyRegionDirSnafu, InvalidMetadataSnafu, InvalidRegionOptionsSnafu, ObjectStoreNotFoundSnafu,
53 RegionCorruptedSnafu, Result, StaleLogEntrySnafu,
54};
55use crate::manifest::action::RegionManifest;
56use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
57use crate::memtable::MemtableBuilderProvider;
58use crate::memtable::bulk::part::BulkPart;
59use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
60use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
61use crate::region::options::RegionOptions;
62use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
63use crate::region::{
64 ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
65};
66use crate::region_write_ctx::RegionWriteCtx;
67use crate::request::OptionOutputTx;
68use crate::schedule::scheduler::SchedulerRef;
69use crate::sst::FormatType;
70use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId};
71use crate::sst::file_purger::{FilePurgerRef, create_file_purger};
72use crate::sst::file_ref::FileReferenceManagerRef;
73use crate::sst::index::intermediate::IntermediateManager;
74use crate::sst::index::puffin_manager::PuffinManagerFactory;
75use crate::sst::location::{self, region_dir_from_table_dir};
76use crate::sst::parquet::metadata::{MetadataLoader, extract_primary_key_range};
77use crate::sst::parquet::reader::MetadataCacheMetrics;
78use crate::time_provider::TimeProviderRef;
79use crate::wal::entry_reader::WalEntryReader;
80use crate::wal::{EntryId, Wal};
81
82const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
83
84static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
85 LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
86
87fn initial_pruned_entry_id(wal_options: &WalOptions) -> EntryId {
88 match wal_options {
89 WalOptions::Kafka(options) => options.initial_pruned_entry_id.unwrap_or(0),
90 WalOptions::RaftEngine | WalOptions::Noop => 0,
91 }
92}
93
94#[async_trait::async_trait]
100pub trait PartitionExprFetcher {
101 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
102}
103
104pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
105
106pub(crate) struct RegionOpener {
108 region_id: RegionId,
109 metadata_builder: Option<RegionMetadataBuilder>,
110 memtable_builder_provider: MemtableBuilderProvider,
111 object_store_manager: ObjectStoreManagerRef,
112 table_dir: String,
113 path_type: PathType,
114 purge_scheduler: SchedulerRef,
115 options: Option<RegionOptions>,
116 cache_manager: Option<CacheManagerRef>,
117 skip_wal_replay: bool,
118 puffin_manager_factory: PuffinManagerFactory,
119 intermediate_manager: IntermediateManager,
120 time_provider: TimeProviderRef,
121 stats: ManifestStats,
122 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
123 replay_checkpoint: Option<u64>,
124 file_ref_manager: FileReferenceManagerRef,
125 partition_expr_fetcher: PartitionExprFetcherRef,
126 hook: Option<RegionHookRef>,
127}
128
129impl RegionOpener {
130 #[allow(clippy::too_many_arguments)]
133 pub(crate) fn new(
134 region_id: RegionId,
135 table_dir: &str,
136 path_type: PathType,
137 memtable_builder_provider: MemtableBuilderProvider,
138 object_store_manager: ObjectStoreManagerRef,
139 purge_scheduler: SchedulerRef,
140 puffin_manager_factory: PuffinManagerFactory,
141 intermediate_manager: IntermediateManager,
142 time_provider: TimeProviderRef,
143 file_ref_manager: FileReferenceManagerRef,
144 partition_expr_fetcher: PartitionExprFetcherRef,
145 ) -> RegionOpener {
146 RegionOpener {
147 region_id,
148 metadata_builder: None,
149 memtable_builder_provider,
150 object_store_manager,
151 table_dir: normalize_dir(table_dir),
152 path_type,
153 purge_scheduler,
154 options: None,
155 cache_manager: None,
156 skip_wal_replay: false,
157 puffin_manager_factory,
158 intermediate_manager,
159 time_provider,
160 stats: Default::default(),
161 wal_entry_reader: None,
162 replay_checkpoint: None,
163 file_ref_manager,
164 partition_expr_fetcher,
165 hook: None,
166 }
167 }
168
169 pub(crate) fn hook(mut self, hook: Option<RegionHookRef>) -> Self {
171 self.hook = hook;
172 self
173 }
174
175 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
177 self.metadata_builder = Some(builder);
178 self
179 }
180
181 fn region_dir(&self) -> String {
183 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
184 }
185
186 fn build_metadata(&mut self) -> Result<RegionMetadata> {
192 let options = self.options.as_ref().unwrap();
193 let mut metadata_builder = self.metadata_builder.take().unwrap();
194 metadata_builder.primary_key_encoding(options.primary_key_encoding());
195 metadata_builder.build().context(InvalidMetadataSnafu)
196 }
197
198 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
200 let region_id = self.region_id;
201 self.options(RegionOptions::try_from_options(region_id, &options)?)
202 }
203
204 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
206 self.replay_checkpoint = replay_checkpoint;
207 self
208 }
209
210 pub(crate) fn wal_entry_reader(
213 mut self,
214 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
215 ) -> Self {
216 self.wal_entry_reader = wal_entry_reader;
217 self
218 }
219
220 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
222 options.validate()?;
223 self.options = Some(options);
224 Ok(self)
225 }
226
227 pub(crate) fn ensure_region_requirements(
229 &self,
230 requirements: RegionRequirements,
231 ) -> Result<()> {
232 if !requirements.object_storage {
233 return Ok(());
234 }
235
236 let options = self.options.as_ref().context(InvalidRegionOptionsSnafu {
237 reason: "missing region options before requirement check".to_string(),
238 })?;
239 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
240
241 ensure!(
242 supports_open_region_object_storage_requirement(&object_store),
243 error::RegionRequirementSnafu {
244 region_id: self.region_id,
245 requirement: "object storage",
246 reason: "region data must be accessible from another datanode",
247 }
248 );
249
250 Ok(())
251 }
252
253 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
255 self.cache_manager = cache_manager;
256 self
257 }
258
259 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
261 self.skip_wal_replay = skip;
262 self
263 }
264
265 pub(crate) async fn create_or_open<S: LogStore>(
272 mut self,
273 config: &MitoConfig,
274 wal: &Wal<S>,
275 ) -> Result<MitoRegionRef> {
276 let region_id = self.region_id;
277 let region_dir = self.region_dir();
278 let metadata = self.build_metadata()?;
279 match self.maybe_open(config, wal).await {
281 Ok(Some(region)) => {
282 let recovered = region.metadata();
283 let expect = &metadata;
285 check_recovered_region(
286 &recovered,
287 expect.region_id,
288 &expect.column_metadatas,
289 &expect.primary_key,
290 )?;
291 region.set_role(RegionRole::Leader);
293
294 return Ok(region);
295 }
296 Ok(None) => {
297 debug!(
298 "No data under directory {}, region_id: {}",
299 region_dir, self.region_id
300 );
301 }
302 Err(e) => {
303 warn!(e;
304 "Failed to open region {} before creating it, region_dir: {}",
305 self.region_id, region_dir
306 );
307 }
308 }
309 let mut options = self.options.take().unwrap();
311 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
312 let provider = self.provider::<S>(&options.wal_options)?;
313 let metadata = Arc::new(metadata);
314 let sst_format = if let Some(format) = options.sst_format {
316 format
317 } else if config.default_flat_format {
318 options.sst_format = Some(FormatType::Flat);
319 FormatType::Flat
320 } else {
321 options.sst_format = Some(FormatType::PrimaryKey);
323 FormatType::PrimaryKey
324 };
325 let mut region_manifest_options =
327 RegionManifestOptions::new(config, ®ion_dir, &object_store);
328 region_manifest_options.manifest_cache = self
330 .cache_manager
331 .as_ref()
332 .and_then(|cm| cm.write_cache())
333 .and_then(|wc| wc.manifest_cache());
334 let flushed_entry_id = provider
337 .initial_flushed_entry_id::<S>(wal.store())
338 .max(initial_pruned_entry_id(&options.wal_options));
339 let manifest_manager = RegionManifestManager::new(
340 metadata.clone(),
341 flushed_entry_id,
342 region_manifest_options,
343 sst_format,
344 &self.stats,
345 )
346 .await?;
347
348 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
349 let part_duration = options.compaction.time_window();
350 let mutable = Arc::new(TimePartitions::new(
352 metadata.clone(),
353 memtable_builder.clone(),
354 0,
355 part_duration,
356 ));
357
358 debug!(
359 "Create region {} with options: {:?}, default_flat_format: {}",
360 region_id, options, config.default_flat_format
361 );
362
363 let version = VersionBuilder::new(metadata, mutable)
364 .options(options)
365 .flushed_entry_id(flushed_entry_id)
366 .build();
367 let version_control = Arc::new(VersionControl::new(version));
368 let access_layer = Arc::new(AccessLayer::new(
369 self.table_dir.clone(),
370 self.path_type,
371 object_store,
372 self.puffin_manager_factory,
373 self.intermediate_manager,
374 ));
375 let now = self.time_provider.current_time_millis();
376
377 Ok(Arc::new(MitoRegion {
378 region_id,
379 version_control,
380 access_layer: access_layer.clone(),
381 manifest_ctx: Arc::new(ManifestContext::new(
383 manifest_manager,
384 RegionRoleState::Leader(RegionLeaderState::Writable),
385 self.hook.clone(),
386 )),
387 file_purger: create_file_purger(
388 config.gc.enable,
389 self.path_type,
390 self.purge_scheduler,
391 access_layer,
392 self.cache_manager,
393 self.file_ref_manager.clone(),
394 ),
395 provider,
396 last_flush_millis: AtomicI64::new(now),
397 last_schedule_compaction_millis: AtomicI64::new(now),
398 time_provider: self.time_provider.clone(),
399 topic_latest_entry_id: AtomicU64::new(flushed_entry_id),
400 written_bytes: Arc::new(AtomicU64::new(0)),
401 stats: self.stats,
402 }))
403 }
404
405 pub(crate) async fn open<S: LogStore>(
409 mut self,
410 config: &MitoConfig,
411 wal: &Wal<S>,
412 ) -> Result<MitoRegionRef> {
413 let region_id = self.region_id;
414 let region_dir = self.region_dir();
415 let region = self
416 .maybe_open(config, wal)
417 .await?
418 .with_context(|| EmptyRegionDirSnafu {
419 region_id,
420 region_dir: ®ion_dir,
421 })?;
422
423 ensure!(
424 region.region_id == self.region_id,
425 RegionCorruptedSnafu {
426 region_id: self.region_id,
427 reason: format!(
428 "recovered region has different region id {}",
429 region.region_id
430 ),
431 }
432 );
433
434 Ok(region)
435 }
436
437 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
438 match wal_options {
439 WalOptions::RaftEngine => {
440 ensure!(
441 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>()
442 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
443 error::IncompatibleWalProviderChangeSnafu {
444 global: "`kafka`",
445 region: "`raft_engine`",
446 }
447 );
448 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
449 }
450 WalOptions::Kafka(options) => {
451 ensure!(
452 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>()
453 || TypeId::of::<NoopLogStore>() == TypeId::of::<S>(),
454 error::IncompatibleWalProviderChangeSnafu {
455 global: "`raft_engine`",
456 region: "`kafka`",
457 }
458 );
459 Ok(Provider::kafka_provider(options.topic.clone()))
460 }
461 WalOptions::Noop => Ok(Provider::noop_provider()),
462 }
463 }
464
465 async fn maybe_open<S: LogStore>(
467 &mut self,
468 config: &MitoConfig,
469 wal: &Wal<S>,
470 ) -> Result<Option<MitoRegionRef>> {
471 let now = Instant::now();
472 let mut region_options = self.options.as_ref().unwrap().clone();
473 let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?;
474 let mut region_manifest_options =
475 RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
476 region_manifest_options.manifest_cache = self
478 .cache_manager
479 .as_ref()
480 .and_then(|cm| cm.write_cache())
481 .and_then(|wc| wc.manifest_cache());
482 let Some(manifest_manager) =
483 RegionManifestManager::open(region_manifest_options, &self.stats).await?
484 else {
485 return Ok(None);
486 };
487
488 let manifest = manifest_manager.manifest();
490 let metadata = if manifest.metadata.partition_expr.is_none()
491 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
492 {
493 let metadata = manifest.metadata.as_ref().clone();
494 let mut builder = RegionMetadataBuilder::from_existing(metadata);
495 builder.partition_expr_json(Some(expr_json));
496 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
497 } else {
498 manifest.metadata.clone()
499 };
500 sanitize_region_options(&manifest, &mut region_options);
502
503 let region_id = self.region_id;
504 let provider = self.provider::<S>(®ion_options.wal_options)?;
505 let wal_entry_reader = self
506 .wal_entry_reader
507 .take()
508 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
509 let on_region_opened = wal.on_region_opened();
510 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
511
512 debug!(
513 "Open region {} at {} with options: {:?}",
514 region_id, self.table_dir, self.options
515 );
516
517 let access_layer = Arc::new(AccessLayer::new(
518 self.table_dir.clone(),
519 self.path_type,
520 object_store,
521 self.puffin_manager_factory.clone(),
522 self.intermediate_manager.clone(),
523 ));
524 let file_purger = create_file_purger(
525 config.gc.enable,
526 self.path_type,
527 self.purge_scheduler.clone(),
528 access_layer.clone(),
529 self.cache_manager.clone(),
530 self.file_ref_manager.clone(),
531 );
532 let memtable_builder = self
534 .memtable_builder_provider
535 .builder_for_options(®ion_options);
536 let part_duration = region_options
539 .compaction
540 .time_window()
541 .or(manifest.compaction_time_window);
542 let mutable = Arc::new(TimePartitions::new(
544 metadata.clone(),
545 memtable_builder.clone(),
546 0,
547 part_duration,
548 ));
549
550 let version_builder = version_builder_from_manifest(
552 &manifest,
553 metadata,
554 file_purger.clone(),
555 mutable,
556 region_options,
557 );
558 let version = version_builder.build();
559 let flushed_entry_id = version.flushed_entry_id;
560 let version_control = Arc::new(VersionControl::new(version));
561
562 let replay_from_entry_id = self
563 .replay_checkpoint
564 .unwrap_or_default()
565 .max(flushed_entry_id);
566 let topic_latest_entry_id = if !self.skip_wal_replay {
567 info!(
568 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}, elapsed: {:?}",
569 replay_from_entry_id,
570 region_id,
571 manifest.manifest_version,
572 flushed_entry_id,
573 now.elapsed()
574 );
575 replay_memtable(
576 &provider,
577 wal_entry_reader,
578 region_id,
579 replay_from_entry_id,
580 &version_control,
581 config.allow_stale_entries,
582 on_region_opened,
583 )
584 .await?;
585 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
589 wal.store()
590 .latest_entry_id(&provider)
591 .unwrap_or(replay_from_entry_id)
592 } else if provider.is_remote_wal() {
593 replay_from_entry_id
594 } else {
595 0
596 }
597 } else {
598 info!(
599 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}, elapsed: {:?}",
600 region_id,
601 manifest.manifest_version,
602 flushed_entry_id,
603 now.elapsed()
604 );
605
606 if provider.is_remote_wal() {
607 replay_from_entry_id
608 } else {
609 0
610 }
611 };
612
613 if let Some(committed_in_manifest) = manifest.committed_sequence {
614 let committed_after_replay = version_control.committed_sequence();
615 if committed_in_manifest > committed_after_replay {
616 info!(
617 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
618 self.region_id,
619 version_control.current().version.flushed_sequence,
620 version_control.committed_sequence(),
621 committed_in_manifest
622 );
623 version_control.set_committed_sequence(committed_in_manifest);
624 }
625 }
626
627 let now = self.time_provider.current_time_millis();
628
629 let region = MitoRegion {
630 region_id: self.region_id,
631 version_control: version_control.clone(),
632 access_layer: access_layer.clone(),
633 manifest_ctx: Arc::new(ManifestContext::new(
635 manifest_manager,
636 RegionRoleState::Follower,
637 self.hook.clone(),
638 )),
639 file_purger,
640 provider: provider.clone(),
641 last_flush_millis: AtomicI64::new(now),
642 last_schedule_compaction_millis: AtomicI64::new(now),
643 time_provider: self.time_provider.clone(),
644 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
645 written_bytes: Arc::new(AtomicU64::new(0)),
646 stats: self.stats.clone(),
647 };
648
649 let region = Arc::new(region);
650
651 maybe_load_cache(®ion, config, &self.cache_manager);
652 maybe_preload_parquet_meta_cache(®ion, config, &self.cache_manager);
653
654 Ok(Some(region))
655 }
656}
657
658#[cfg(not(feature = "test-shared-fs-region-migration"))]
659fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool {
660 is_object_storage(object_store)
661}
662
663#[cfg(feature = "test-shared-fs-region-migration")]
664fn supports_open_region_object_storage_requirement(object_store: &ObjectStore) -> bool {
665 is_object_storage(object_store)
670 || object_store.info().scheme() == object_store::services::FS_SCHEME
671}
672
673pub(crate) fn version_builder_from_manifest(
675 manifest: &RegionManifest,
676 metadata: RegionMetadataRef,
677 file_purger: FilePurgerRef,
678 mutable: TimePartitionsRef,
679 region_options: RegionOptions,
680) -> VersionBuilder {
681 VersionBuilder::new(metadata, mutable)
682 .add_files(file_purger, manifest.files.values().cloned())
683 .flushed_entry_id(manifest.flushed_entry_id)
684 .flushed_sequence(manifest.flushed_sequence)
685 .truncated_entry_id(manifest.truncated_entry_id)
686 .compaction_time_window(manifest.compaction_time_window)
687 .options(region_options)
688}
689
690pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
692 match options.sst_format {
697 Some(format) if format != manifest.sst_format => {
698 common_telemetry::warn!(
699 "Overriding SST format from {:?} (manifest) to {:?} (options) for region {}",
700 manifest.sst_format,
701 format,
702 manifest.metadata.region_id,
703 );
704 }
705 Some(_) => {}
706 None => {
707 options.sst_format = Some(manifest.sst_format);
708 }
709 }
710 if let Some(manifest_append_mode) = manifest.append_mode
711 && options.append_mode != manifest_append_mode
712 {
713 common_telemetry::warn!(
714 "Overriding append_mode from {} to {} for region {}",
715 options.append_mode,
716 manifest_append_mode,
717 manifest.metadata.region_id,
718 );
719 options.append_mode = manifest_append_mode;
720 }
721 if options.append_mode && options.merge_mode.take().is_some() {
722 common_telemetry::warn!(
723 "Ignoring merge_mode because append_mode is enabled for region {}",
724 manifest.metadata.region_id,
725 );
726 }
727}
728
729pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
731 let append_mode_enabled = options
732 .get("append_mode")
733 .is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
734
735 if append_mode_enabled && options.remove("merge_mode").is_some() {
736 common_telemetry::warn!(
737 "Ignoring merge_mode in open request options because append_mode is enabled"
738 );
739 }
740}
741
742pub fn get_object_store(
744 name: &Option<String>,
745 object_store_manager: &ObjectStoreManagerRef,
746) -> Result<object_store::ObjectStore> {
747 if let Some(name) = name {
748 Ok(object_store_manager
749 .find(name)
750 .with_context(|| ObjectStoreNotFoundSnafu {
751 object_store: name.clone(),
752 })?
753 .clone())
754 } else {
755 Ok(object_store_manager.default_object_store().clone())
756 }
757}
758
759pub(crate) fn check_recovered_region(
761 recovered: &RegionMetadata,
762 region_id: RegionId,
763 column_metadatas: &[ColumnMetadata],
764 primary_key: &[ColumnId],
765) -> Result<()> {
766 if recovered.region_id != region_id {
767 error!(
768 "Recovered region {}, expect region {}",
769 recovered.region_id, region_id
770 );
771 return RegionCorruptedSnafu {
772 region_id,
773 reason: format!(
774 "recovered metadata has different region id {}",
775 recovered.region_id
776 ),
777 }
778 .fail();
779 }
780 if recovered.column_metadatas != column_metadatas {
781 error!(
782 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
783 recovered.region_id, recovered.column_metadatas, column_metadatas
784 );
785
786 return RegionCorruptedSnafu {
787 region_id,
788 reason: "recovered metadata has different schema",
789 }
790 .fail();
791 }
792 if recovered.primary_key != primary_key {
793 error!(
794 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
795 recovered.region_id, recovered.primary_key, primary_key
796 );
797
798 return RegionCorruptedSnafu {
799 region_id,
800 reason: "recovered metadata has different primary key",
801 }
802 .fail();
803 }
804
805 Ok(())
806}
807
808pub(crate) async fn replay_memtable<F>(
810 provider: &Provider,
811 mut wal_entry_reader: Box<dyn WalEntryReader>,
812 region_id: RegionId,
813 flushed_entry_id: EntryId,
814 version_control: &VersionControlRef,
815 allow_stale_entries: bool,
816 on_region_opened: F,
817) -> Result<EntryId>
818where
819 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
820{
821 let now = Instant::now();
822 let mut rows_replayed = 0;
823 let mut last_entry_id = flushed_entry_id;
826 let replay_from_entry_id = flushed_entry_id + 1;
827
828 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
829 while let Some(res) = wal_stream.next().await {
830 let (entry_id, entry) = res?;
831 if entry_id <= flushed_entry_id {
832 warn!(
833 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
834 region_id, flushed_entry_id, entry_id
835 );
836 ensure!(
837 allow_stale_entries,
838 StaleLogEntrySnafu {
839 region_id,
840 flushed_entry_id,
841 unexpected_entry_id: entry_id,
842 }
843 );
844 }
845 last_entry_id = last_entry_id.max(entry_id);
846
847 let mut region_write_ctx = RegionWriteCtx::new(
848 region_id,
849 version_control,
850 provider.clone(),
851 None,
853 );
854 for mutation in entry.mutations {
855 rows_replayed += mutation
856 .rows
857 .as_ref()
858 .map(|rows| rows.rows.len())
859 .unwrap_or(0);
860 region_write_ctx.push_mutation(
861 mutation.op_type,
862 mutation.rows,
863 mutation.write_hint,
864 OptionOutputTx::none(),
865 Some(mutation.sequence),
867 );
868 }
869
870 for bulk_entry in entry.bulk_entries {
871 let part = BulkPart::try_from(bulk_entry)?;
872 rows_replayed += part.num_rows();
873 let bulk_sequence_from_wal = part.sequence;
875 ensure!(
876 region_write_ctx.push_bulk(
877 OptionOutputTx::none(),
878 part,
879 Some(bulk_sequence_from_wal)
880 ),
881 RegionCorruptedSnafu {
882 region_id,
883 reason: "unable to replay memtable with bulk entries",
884 }
885 );
886 }
887
888 region_write_ctx.set_next_entry_id(last_entry_id + 1);
890 region_write_ctx.write_memtable().await;
891 region_write_ctx.write_bulk().await;
892 }
893
894 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
897
898 let series_count = version_control.current().series_count();
899 info!(
900 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
901 region_id,
902 provider,
903 rows_replayed,
904 replay_from_entry_id,
905 last_entry_id,
906 series_count,
907 now.elapsed()
908 );
909 Ok(last_entry_id)
910}
911
912pub(crate) struct RegionLoadCacheTask {
914 region: MitoRegionRef,
915}
916
917impl RegionLoadCacheTask {
918 pub(crate) fn new(region: MitoRegionRef) -> Self {
919 Self { region }
920 }
921
922 pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
924 let region_id = self.region.region_id;
925 let table_dir = self.region.access_layer.table_dir();
926 let path_type = self.region.access_layer.path_type();
927 let object_store = self.region.access_layer.object_store();
928 let version_control = &self.region.version_control;
929
930 let mut files_to_download = Vec::new();
932 let mut files_already_cached = 0;
933
934 {
935 let version = version_control.current().version;
936 for level in version.ssts.levels() {
937 for file_handle in level.files.values() {
938 let file_meta = file_handle.meta_ref();
939 if file_meta.exists_index() {
940 let puffin_key = IndexKey::new(
941 file_meta.region_id,
942 file_meta.file_id,
943 FileType::Puffin(file_meta.index_version),
944 );
945
946 if !file_cache.contains_key(&puffin_key) {
947 files_to_download.push((
948 puffin_key,
949 file_meta.index_file_size,
950 file_meta.time_range.1, ));
952 } else {
953 files_already_cached += 1;
954 }
955 }
956 }
957 }
958 }
961
962 files_to_download.sort_by_key(|b| std::cmp::Reverse(b.2));
964
965 let total_files = files_to_download.len() as i64;
966
967 info!(
968 "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
969 region_id, total_files, files_already_cached
970 );
971
972 CACHE_FILL_PENDING_FILES.add(total_files);
973
974 let mut files_downloaded = 0;
975 let mut files_skipped = 0;
976
977 for (puffin_key, file_size, max_timestamp) in files_to_download {
978 let current_size = file_cache.puffin_cache_size();
979 let capacity = file_cache.puffin_cache_capacity();
980 let region_state = self.region.state();
981 if !can_load_cache(region_state) {
982 info!(
983 "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
984 region_state, region_id, current_size, capacity
985 );
986 break;
987 }
988
989 if current_size + file_size > capacity {
991 info!(
992 "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}, file_timestamp: {:?}",
993 region_id, puffin_key.file_id, current_size, file_size, capacity, max_timestamp
994 );
995 files_skipped = (total_files - files_downloaded) as usize;
996 CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
997 break;
998 }
999
1000 let index_version = if let FileType::Puffin(version) = puffin_key.file_type {
1001 version
1002 } else {
1003 unreachable!("`files_to_download` should only contains Puffin files");
1004 };
1005 let index_id = RegionIndexId::new(
1006 RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
1007 index_version,
1008 );
1009
1010 let index_remote_path = location::index_file_path(table_dir, index_id, path_type);
1011
1012 match file_cache
1013 .download(puffin_key, &index_remote_path, object_store, file_size)
1014 .await
1015 {
1016 Ok(_) => {
1017 debug!(
1018 "Downloaded index file to write cache, region: {}, file_id: {}",
1019 region_id, puffin_key.file_id
1020 );
1021 files_downloaded += 1;
1022 CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
1023 CACHE_FILL_PENDING_FILES.dec();
1024 }
1025 Err(e) => {
1026 warn!(
1027 e; "Failed to download index file to write cache, region: {}, file_id: {}",
1028 region_id, puffin_key.file_id
1029 );
1030 CACHE_FILL_PENDING_FILES.dec();
1031 }
1032 }
1033 }
1034
1035 info!(
1036 "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
1037 region_id, total_files, files_downloaded, files_already_cached, files_skipped
1038 );
1039 }
1040}
1041
1042fn maybe_load_cache(
1044 region: &MitoRegionRef,
1045 config: &MitoConfig,
1046 cache_manager: &Option<CacheManagerRef>,
1047) {
1048 let Some(cache_manager) = cache_manager else {
1049 return;
1050 };
1051 let Some(write_cache) = cache_manager.write_cache() else {
1052 return;
1053 };
1054
1055 let preload_enabled = config.preload_index_cache;
1056 if !preload_enabled {
1057 return;
1058 }
1059
1060 let task = RegionLoadCacheTask::new(region.clone());
1061 write_cache.load_region_cache(task);
1062}
1063
1064#[allow(clippy::too_many_arguments)]
1075async fn preload_parquet_meta_cache_for_files(
1076 region_id: RegionId,
1077 cache_manager: CacheManagerRef,
1078 sst_meta_cache_capacity: u64,
1079 table_dir: String,
1080 path_type: PathType,
1081 object_store: object_store::ObjectStore,
1082 region_metadata: RegionMetadataRef,
1083 mut files: Vec<FileHandle>,
1084) -> usize {
1085 if !cache_manager.sst_meta_cache_enabled()
1086 || sst_meta_cache_capacity == 0
1087 || cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
1088 {
1089 return 0;
1090 }
1091
1092 let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME;
1093
1094 files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
1096
1097 let mut loaded = 0usize;
1098 for file_handle in files {
1099 if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
1101 break;
1102 }
1103
1104 let file_id = file_handle.file_id();
1105 let mut cache_metrics = MetadataCacheMetrics::default();
1106 if let Some(metadata) = cache_manager
1107 .get_parquet_meta_data(file_id, &mut cache_metrics, PageIndexPolicy::Optional)
1108 .await
1109 {
1110 if file_handle.primary_key_range().is_none()
1111 && let Some(primary_key_range) =
1112 extract_primary_key_range(&metadata, ®ion_metadata)
1113 {
1114 file_handle.set_primary_key_range(primary_key_range);
1115 }
1116 if cache_metrics.mem_cache_hit == 0 {
1118 loaded += 1;
1119 }
1120 continue;
1121 }
1122
1123 if !allow_direct_load {
1124 continue;
1125 }
1126
1127 let file_size = file_handle.meta_ref().file_size;
1128 let file_path = file_handle.file_path(&table_dir, path_type);
1129 let mut loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
1130 loader.with_page_index_policy(PageIndexPolicy::Optional);
1131 match loader.load(&mut cache_metrics).await {
1132 Ok(metadata) => {
1133 if let Some(primary_key_range) =
1134 extract_primary_key_range(&metadata, ®ion_metadata)
1135 {
1136 file_handle.set_primary_key_range(primary_key_range);
1137 }
1138 cache_manager.put_parquet_meta_data(file_id, Arc::new(metadata), None);
1139 loaded += 1;
1140 }
1141 Err(err) => {
1142 warn!(
1144 err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
1145 region_id, file_path
1146 );
1147 }
1148 }
1149 }
1150
1151 loaded
1152}
1153
1154fn maybe_preload_parquet_meta_cache(
1155 region: &MitoRegionRef,
1156 config: &MitoConfig,
1157 cache_manager: &Option<CacheManagerRef>,
1158) {
1159 let Some(cache_manager) = cache_manager else {
1160 return;
1161 };
1162 if !cache_manager.sst_meta_cache_enabled() {
1163 return;
1164 }
1165
1166 if config.sst_meta_cache_size.as_bytes() == 0 {
1168 return;
1169 }
1170 if !config.preload_index_cache {
1171 return;
1172 }
1173
1174 let region = region.clone();
1175 let cache_manager = cache_manager.clone();
1176 let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
1177
1178 tokio::spawn(async move {
1179 let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
1181
1182 let region_id = region.region_id;
1183 let table_dir = region.access_layer.table_dir().to_string();
1184 let path_type = region.access_layer.path_type();
1185 let object_store = region.access_layer.object_store().clone();
1186 let region_metadata = region.version_control.current().version.metadata.clone();
1187
1188 let mut files = Vec::new();
1190 {
1191 let version = region.version_control.current().version;
1192 for level in version.ssts.levels() {
1193 for file_handle in level.files.values() {
1194 files.push(file_handle.clone());
1195 }
1196 }
1197 }
1198 let preloading_start = Instant::now();
1199 let loaded = preload_parquet_meta_cache_for_files(
1200 region_id,
1201 cache_manager,
1202 sst_meta_cache_capacity,
1203 table_dir,
1204 path_type,
1205 object_store,
1206 region_metadata,
1207 files,
1208 )
1209 .await;
1210 let preloading_cost = preloading_start.elapsed();
1211
1212 if loaded > 0 {
1213 info!(
1214 "Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
1215 region_id,
1216 loaded,
1217 preloading_cost.as_millis()
1218 );
1219 }
1220 });
1221}
1222
1223fn can_load_cache(state: RegionRoleState) -> bool {
1224 match state {
1225 RegionRoleState::Leader(RegionLeaderState::Writable)
1226 | RegionRoleState::Leader(RegionLeaderState::Staging)
1227 | RegionRoleState::Leader(RegionLeaderState::Altering)
1228 | RegionRoleState::Leader(RegionLeaderState::EnteringStaging)
1229 | RegionRoleState::Leader(RegionLeaderState::Editing)
1230 | RegionRoleState::Follower => true,
1231 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1233 | RegionRoleState::Leader(RegionLeaderState::Dropping)
1234 | RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
1235 }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240 use std::collections::HashMap;
1241 use std::sync::Arc;
1242
1243 use common_base::readable_size::ReadableSize;
1244 use common_test_util::temp_dir::create_temp_dir;
1245 use common_time::Timestamp;
1246 use common_wal::options::{KafkaWalOptions, WalOptions};
1247 use datatypes::arrow::array::{ArrayRef, BinaryArray, Int64Array};
1248 use datatypes::arrow::record_batch::RecordBatch;
1249 use object_store::ObjectStore;
1250 use object_store::services::{Fs, Memory, S3};
1251 use parquet::arrow::ArrowWriter;
1252 use parquet::file::metadata::KeyValue;
1253 use parquet::file::properties::WriterProperties;
1254 use store_api::region_request::PathType;
1255 use store_api::storage::{FileId, RegionId};
1256
1257 use super::{
1258 initial_pruned_entry_id, preload_parquet_meta_cache_for_files, sanitize_region_options,
1259 supports_open_region_object_storage_requirement,
1260 };
1261 use crate::cache::CacheManager;
1262 use crate::cache::file_cache::{FileType, IndexKey};
1263 use crate::manifest::action::{RegionManifest, RemovedFilesRecord};
1264 use crate::region::options::RegionOptions;
1265 use crate::sst::FormatType;
1266 use crate::sst::file::{FileHandle, FileMeta};
1267 use crate::sst::file_purger::NoopFilePurger;
1268 use crate::sst::parquet::PARQUET_METADATA_KEY;
1269 use crate::test_util::TestEnv;
1270 use crate::test_util::sst_util::sst_region_metadata;
1271
1272 fn build_test_manifest(sst_format: FormatType) -> RegionManifest {
1273 RegionManifest {
1274 metadata: Arc::new(sst_region_metadata()),
1275 files: HashMap::new(),
1276 removed_files: RemovedFilesRecord::default(),
1277 flushed_entry_id: 0,
1278 flushed_sequence: 0,
1279 committed_sequence: None,
1280 manifest_version: 0,
1281 truncated_entry_id: None,
1282 compaction_time_window: None,
1283 sst_format,
1284 append_mode: None,
1285 }
1286 }
1287
1288 fn build_fs_object_store() -> ObjectStore {
1289 ObjectStore::new(Fs::default().root("/tmp"))
1290 .unwrap()
1291 .finish()
1292 }
1293
1294 #[test]
1295 fn test_initial_pruned_entry_id() {
1296 assert_eq!(0, initial_pruned_entry_id(&WalOptions::RaftEngine));
1297 assert_eq!(0, initial_pruned_entry_id(&WalOptions::Noop));
1298 assert_eq!(
1299 0,
1300 initial_pruned_entry_id(&WalOptions::Kafka(KafkaWalOptions::new(
1301 "test_topic".to_string()
1302 )))
1303 );
1304 assert_eq!(
1305 42,
1306 initial_pruned_entry_id(&WalOptions::Kafka(KafkaWalOptions {
1307 topic: "test_topic".to_string(),
1308 initial_pruned_entry_id: Some(42),
1309 }))
1310 );
1311 }
1312
1313 #[test]
1314 #[cfg(not(feature = "test-shared-fs-region-migration"))]
1315 fn test_open_requirement_rejects_fs_object_store() {
1316 let object_store = build_fs_object_store();
1317
1318 assert!(!supports_open_region_object_storage_requirement(
1319 &object_store
1320 ));
1321 }
1322
1323 #[test]
1324 #[cfg(feature = "test-shared-fs-region-migration")]
1325 fn test_open_requirement_accepts_shared_fs_object_store_for_tests() {
1326 let object_store = build_fs_object_store();
1327
1328 assert!(supports_open_region_object_storage_requirement(
1329 &object_store
1330 ));
1331 }
1332
1333 #[test]
1334 fn test_open_requirement_accepts_s3_object_store() {
1335 let object_store = ObjectStore::new(
1336 S3::default()
1337 .bucket("test-bucket")
1338 .region("us-east-1")
1339 .disable_ec2_metadata(),
1340 )
1341 .unwrap()
1342 .finish();
1343
1344 assert!(supports_open_region_object_storage_requirement(
1345 &object_store
1346 ));
1347 }
1348
1349 #[test]
1350 fn test_sanitize_region_options_options_format_wins() {
1351 let manifest = build_test_manifest(FormatType::PrimaryKey);
1354 let mut options = RegionOptions {
1355 sst_format: Some(FormatType::Flat),
1356 ..Default::default()
1357 };
1358 sanitize_region_options(&manifest, &mut options);
1359 assert_eq!(options.sst_format, Some(FormatType::Flat));
1360 }
1361
1362 #[test]
1363 fn test_sanitize_region_options_fills_from_manifest_when_unset() {
1364 let manifest = build_test_manifest(FormatType::Flat);
1367 let mut options = RegionOptions {
1368 sst_format: None,
1369 ..Default::default()
1370 };
1371 sanitize_region_options(&manifest, &mut options);
1372 assert_eq!(options.sst_format, Some(FormatType::Flat));
1373 }
1374
1375 fn sst_parquet_bytes(batch: &RecordBatch) -> Vec<u8> {
1376 let key_value_meta = KeyValue::new(
1377 PARQUET_METADATA_KEY.to_string(),
1378 sst_region_metadata().to_json().unwrap(),
1379 );
1380 let props = WriterProperties::builder()
1381 .set_key_value_metadata(Some(vec![key_value_meta]))
1382 .build();
1383
1384 let mut parquet_bytes = Vec::new();
1385 let mut writer =
1386 ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), Some(props)).unwrap();
1387 writer.write(batch).unwrap();
1388 writer.close().unwrap();
1389
1390 parquet_bytes
1391 }
1392
1393 #[tokio::test]
1394 async fn test_preload_parquet_meta_cache_uses_file_cache() {
1395 let env = TestEnv::new().await;
1396
1397 let local_store = ObjectStore::new(Memory::default()).unwrap().finish();
1398 let write_cache = env
1399 .create_write_cache(local_store, ReadableSize::mb(1024))
1400 .await;
1401 let cache_manager = Arc::new(
1402 CacheManager::builder()
1403 .sst_meta_cache_size(1024 * 1024)
1404 .write_cache(Some(write_cache.clone()))
1405 .build(),
1406 );
1407
1408 let region_id = RegionId::new(1, 1);
1409 let file_id = FileId::random();
1410
1411 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1412 let primary_key = Arc::new(BinaryArray::from_iter_values([b"a", b"b", b"c"])) as ArrayRef;
1413 let batch = RecordBatch::try_from_iter([
1414 ("col", col),
1415 (
1416 store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME,
1417 primary_key,
1418 ),
1419 ])
1420 .unwrap();
1421 let parquet_bytes = sst_parquet_bytes(&batch);
1422 let file_size = parquet_bytes.len() as u64;
1423
1424 let file_meta = FileMeta {
1425 region_id,
1426 file_id,
1427 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1428 level: 0,
1429 file_size,
1430 max_row_group_uncompressed_size: 0,
1431 available_indexes: Default::default(),
1432 indexes: vec![],
1433 index_file_size: 0,
1434 index_version: 0,
1435 num_rows: 3,
1436 num_row_groups: 1,
1437 sequence: None,
1438 partition_expr: None,
1439 num_series: 0,
1440 ..Default::default()
1441 };
1442 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1443
1444 let table_dir = "test_table";
1445 let path_type = PathType::Bare;
1446 let remote_path = file_handle.file_path(table_dir, path_type);
1447
1448 let source_store = ObjectStore::new(Memory::default()).unwrap().finish();
1449 source_store
1450 .write(&remote_path, parquet_bytes)
1451 .await
1452 .unwrap();
1453
1454 let index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
1456 write_cache
1457 .file_cache()
1458 .download(index_key, &remote_path, &source_store, file_size)
1459 .await
1460 .unwrap();
1461
1462 let region_file_id = file_handle.file_id();
1463 assert!(
1464 cache_manager
1465 .get_parquet_meta_data_from_mem_cache(region_file_id)
1466 .is_none()
1467 );
1468
1469 let loaded = preload_parquet_meta_cache_for_files(
1470 region_id,
1471 cache_manager.clone(),
1472 1024 * 1024,
1473 table_dir.to_string(),
1474 path_type,
1475 source_store.clone(),
1476 Arc::new(sst_region_metadata()),
1477 vec![file_handle.clone()],
1478 )
1479 .await;
1480
1481 assert_eq!(loaded, 1);
1483 assert!(
1484 cache_manager
1485 .get_parquet_meta_data_from_mem_cache(region_file_id)
1486 .is_some()
1487 );
1488 assert!(
1491 cache_manager
1492 .get_sst_meta_data_from_mem_cache(
1493 region_file_id,
1494 parquet::file::metadata::PageIndexPolicy::Optional,
1495 )
1496 .is_some()
1497 );
1498 assert!(file_handle.primary_key_range().is_some());
1499 }
1500
1501 #[tokio::test]
1502 async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
1503 let cache_manager = Arc::new(
1504 CacheManager::builder()
1505 .sst_meta_cache_size(1024 * 1024)
1506 .build(),
1507 );
1508
1509 let region_id = RegionId::new(1, 1);
1510 let file_id = FileId::random();
1511
1512 let file_meta = FileMeta {
1514 region_id,
1515 file_id,
1516 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1517 level: 0,
1518 file_size: 0,
1519 max_row_group_uncompressed_size: 0,
1520 available_indexes: Default::default(),
1521 indexes: vec![],
1522 index_file_size: 0,
1523 index_version: 0,
1524 num_rows: 3,
1525 num_row_groups: 1,
1526 sequence: None,
1527 partition_expr: None,
1528 num_series: 0,
1529 ..Default::default()
1530 };
1531 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1532
1533 let table_dir = "test_table";
1534 let path_type = PathType::Bare;
1535 let remote_path = file_handle.file_path(table_dir, path_type);
1536
1537 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
1539 object_store
1540 .write(&remote_path, b"noop".as_slice())
1541 .await
1542 .unwrap();
1543
1544 let region_file_id = file_handle.file_id();
1545 assert!(
1546 cache_manager
1547 .get_parquet_meta_data_from_mem_cache(region_file_id)
1548 .is_none()
1549 );
1550
1551 let loaded = preload_parquet_meta_cache_for_files(
1552 region_id,
1553 cache_manager.clone(),
1554 1024 * 1024,
1555 table_dir.to_string(),
1556 path_type,
1557 object_store,
1558 Arc::new(sst_region_metadata()),
1559 vec![file_handle],
1560 )
1561 .await;
1562
1563 assert_eq!(loaded, 0);
1564 assert!(
1565 cache_manager
1566 .get_parquet_meta_data_from_mem_cache(region_file_id)
1567 .is_none()
1568 );
1569 }
1570
1571 #[tokio::test]
1572 async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
1573 let cache_manager = Arc::new(
1574 CacheManager::builder()
1575 .sst_meta_cache_size(1024 * 1024)
1576 .build(),
1577 );
1578
1579 let region_id = RegionId::new(1, 1);
1580 let file_id = FileId::random();
1581
1582 let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
1583 let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
1584 let parquet_bytes = sst_parquet_bytes(&batch);
1585
1586 let file_meta = FileMeta {
1589 region_id,
1590 file_id,
1591 time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
1592 level: 0,
1593 file_size: 0,
1594 max_row_group_uncompressed_size: 0,
1595 available_indexes: Default::default(),
1596 indexes: vec![],
1597 index_file_size: 0,
1598 index_version: 0,
1599 num_rows: 3,
1600 num_row_groups: 1,
1601 sequence: None,
1602 partition_expr: None,
1603 num_series: 0,
1604 ..Default::default()
1605 };
1606 let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
1607
1608 let table_dir = "test_table";
1609 let path_type = PathType::Bare;
1610 let file_path = file_handle.file_path(table_dir, path_type);
1611
1612 let root = create_temp_dir("parquet-meta-preload");
1613 let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
1614 .unwrap()
1615 .finish();
1616 object_store.write(&file_path, parquet_bytes).await.unwrap();
1617
1618 let region_file_id = file_handle.file_id();
1619 assert!(
1620 cache_manager
1621 .get_parquet_meta_data_from_mem_cache(region_file_id)
1622 .is_none()
1623 );
1624
1625 let loaded = preload_parquet_meta_cache_for_files(
1626 region_id,
1627 cache_manager.clone(),
1628 1024 * 1024,
1629 table_dir.to_string(),
1630 path_type,
1631 object_store,
1632 Arc::new(sst_region_metadata()),
1633 vec![file_handle],
1634 )
1635 .await;
1636
1637 assert_eq!(loaded, 1);
1638 assert!(
1639 cache_manager
1640 .get_parquet_meta_data_from_mem_cache(region_file_id)
1641 .is_some()
1642 );
1643 }
1644}