1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicI64, AtomicU64};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::raft_engine::log_store::RaftEngineLogStore;
29use object_store::manager::ObjectStoreManagerRef;
30use object_store::util::{join_dir, normalize_dir};
31use snafu::{OptionExt, ResultExt, ensure};
32use store_api::logstore::LogStore;
33use store_api::logstore::provider::Provider;
34use store_api::metadata::{
35 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
36};
37use store_api::region_engine::RegionRole;
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, RegionId};
40
41use crate::access_layer::AccessLayer;
42use crate::cache::CacheManagerRef;
43use crate::config::MitoConfig;
44use crate::error;
45use crate::error::{
46 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
47 Result, StaleLogEntrySnafu,
48};
49use crate::manifest::action::RegionManifest;
50use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
51use crate::manifest::storage::manifest_compress_type;
52use crate::memtable::MemtableBuilderProvider;
53use crate::memtable::bulk::part::BulkPart;
54use crate::memtable::time_partition::TimePartitions;
55use crate::region::options::RegionOptions;
56use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
57use crate::region::{
58 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
59};
60use crate::region_write_ctx::RegionWriteCtx;
61use crate::request::OptionOutputTx;
62use crate::schedule::scheduler::SchedulerRef;
63use crate::sst::FormatType;
64use crate::sst::file_purger::create_local_file_purger;
65use crate::sst::file_ref::FileReferenceManagerRef;
66use crate::sst::index::intermediate::IntermediateManager;
67use crate::sst::index::puffin_manager::PuffinManagerFactory;
68use crate::sst::location::region_dir_from_table_dir;
69use crate::time_provider::TimeProviderRef;
70use crate::wal::entry_reader::WalEntryReader;
71use crate::wal::{EntryId, Wal};
72
73#[async_trait::async_trait]
79pub trait PartitionExprFetcher {
80 async fn fetch_expr(&self, region_id: RegionId) -> Option<String>;
81}
82
83pub type PartitionExprFetcherRef = Arc<dyn PartitionExprFetcher + Send + Sync>;
84
85pub(crate) struct RegionOpener {
87 region_id: RegionId,
88 metadata_builder: Option<RegionMetadataBuilder>,
89 memtable_builder_provider: MemtableBuilderProvider,
90 object_store_manager: ObjectStoreManagerRef,
91 table_dir: String,
92 path_type: PathType,
93 purge_scheduler: SchedulerRef,
94 options: Option<RegionOptions>,
95 cache_manager: Option<CacheManagerRef>,
96 skip_wal_replay: bool,
97 puffin_manager_factory: PuffinManagerFactory,
98 intermediate_manager: IntermediateManager,
99 time_provider: TimeProviderRef,
100 stats: ManifestStats,
101 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
102 replay_checkpoint: Option<u64>,
103 file_ref_manager: FileReferenceManagerRef,
104 partition_expr_fetcher: PartitionExprFetcherRef,
105}
106
107impl RegionOpener {
108 #[allow(clippy::too_many_arguments)]
111 pub(crate) fn new(
112 region_id: RegionId,
113 table_dir: &str,
114 path_type: PathType,
115 memtable_builder_provider: MemtableBuilderProvider,
116 object_store_manager: ObjectStoreManagerRef,
117 purge_scheduler: SchedulerRef,
118 puffin_manager_factory: PuffinManagerFactory,
119 intermediate_manager: IntermediateManager,
120 time_provider: TimeProviderRef,
121 file_ref_manager: FileReferenceManagerRef,
122 partition_expr_fetcher: PartitionExprFetcherRef,
123 ) -> RegionOpener {
124 RegionOpener {
125 region_id,
126 metadata_builder: None,
127 memtable_builder_provider,
128 object_store_manager,
129 table_dir: normalize_dir(table_dir),
130 path_type,
131 purge_scheduler,
132 options: None,
133 cache_manager: None,
134 skip_wal_replay: false,
135 puffin_manager_factory,
136 intermediate_manager,
137 time_provider,
138 stats: Default::default(),
139 wal_entry_reader: None,
140 replay_checkpoint: None,
141 file_ref_manager,
142 partition_expr_fetcher,
143 }
144 }
145
146 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
148 self.metadata_builder = Some(builder);
149 self
150 }
151
152 fn region_dir(&self) -> String {
154 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
155 }
156
157 fn build_metadata(&mut self) -> Result<RegionMetadata> {
163 let options = self.options.as_ref().unwrap();
164 let mut metadata_builder = self.metadata_builder.take().unwrap();
165 metadata_builder.primary_key_encoding(options.primary_key_encoding());
166 metadata_builder.build().context(InvalidMetadataSnafu)
167 }
168
169 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
171 self.options(RegionOptions::try_from(&options)?)
172 }
173
174 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
176 self.replay_checkpoint = replay_checkpoint;
177 self
178 }
179
180 pub(crate) fn wal_entry_reader(
183 mut self,
184 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
185 ) -> Self {
186 self.wal_entry_reader = wal_entry_reader;
187 self
188 }
189
190 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
192 options.validate()?;
193 self.options = Some(options);
194 Ok(self)
195 }
196
197 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
199 self.cache_manager = cache_manager;
200 self
201 }
202
203 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
205 self.skip_wal_replay = skip;
206 self
207 }
208
209 pub(crate) async fn create_or_open<S: LogStore>(
216 mut self,
217 config: &MitoConfig,
218 wal: &Wal<S>,
219 ) -> Result<MitoRegion> {
220 let region_id = self.region_id;
221 let region_dir = self.region_dir();
222 let metadata = self.build_metadata()?;
223 match self.maybe_open(config, wal).await {
225 Ok(Some(region)) => {
226 let recovered = region.metadata();
227 let expect = &metadata;
229 check_recovered_region(
230 &recovered,
231 expect.region_id,
232 &expect.column_metadatas,
233 &expect.primary_key,
234 )?;
235 region.set_role(RegionRole::Leader);
237
238 return Ok(region);
239 }
240 Ok(None) => {
241 debug!(
242 "No data under directory {}, region_id: {}",
243 region_dir, self.region_id
244 );
245 }
246 Err(e) => {
247 warn!(e;
248 "Failed to open region {} before creating it, region_dir: {}",
249 self.region_id, region_dir
250 );
251 }
252 }
253 let options = self.options.take().unwrap();
255 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
256 let provider = self.provider::<S>(&options.wal_options)?;
257 let metadata = Arc::new(metadata);
258 let sst_format = if let Some(format) = options.sst_format {
260 format
261 } else if config.default_experimental_flat_format {
262 FormatType::Flat
263 } else {
264 FormatType::PrimaryKey
266 };
267 let region_manifest_options =
269 Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
270 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
272 let manifest_manager = RegionManifestManager::new(
273 metadata.clone(),
274 flushed_entry_id,
275 region_manifest_options,
276 self.stats.total_manifest_size.clone(),
277 self.stats.manifest_version.clone(),
278 sst_format,
279 )
280 .await?;
281
282 let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
283 let part_duration = options.compaction.time_window();
284 let mutable = Arc::new(TimePartitions::new(
286 metadata.clone(),
287 memtable_builder.clone(),
288 0,
289 part_duration,
290 ));
291
292 debug!("Create region {} with options: {:?}", region_id, options);
293
294 let version = VersionBuilder::new(metadata, mutable)
295 .options(options)
296 .build();
297 let version_control = Arc::new(VersionControl::new(version));
298 let access_layer = Arc::new(AccessLayer::new(
299 self.table_dir.clone(),
300 self.path_type,
301 object_store,
302 self.puffin_manager_factory,
303 self.intermediate_manager,
304 ));
305 let now = self.time_provider.current_time_millis();
306
307 Ok(MitoRegion {
308 region_id,
309 version_control,
310 access_layer: access_layer.clone(),
311 manifest_ctx: Arc::new(ManifestContext::new(
313 manifest_manager,
314 RegionRoleState::Leader(RegionLeaderState::Writable),
315 )),
316 file_purger: create_local_file_purger(
317 self.purge_scheduler,
318 access_layer,
319 self.cache_manager,
320 self.file_ref_manager.clone(),
321 ),
322 provider,
323 last_flush_millis: AtomicI64::new(now),
324 last_compaction_millis: AtomicI64::new(now),
325 time_provider: self.time_provider.clone(),
326 topic_latest_entry_id: AtomicU64::new(0),
327 memtable_builder,
328 written_bytes: Arc::new(AtomicU64::new(0)),
329 sst_format,
330 stats: self.stats,
331 })
332 }
333
334 pub(crate) async fn open<S: LogStore>(
338 mut self,
339 config: &MitoConfig,
340 wal: &Wal<S>,
341 ) -> Result<MitoRegion> {
342 let region_id = self.region_id;
343 let region_dir = self.region_dir();
344 let region = self
345 .maybe_open(config, wal)
346 .await?
347 .context(EmptyRegionDirSnafu {
348 region_id,
349 region_dir: ®ion_dir,
350 })?;
351
352 ensure!(
353 region.region_id == self.region_id,
354 RegionCorruptedSnafu {
355 region_id: self.region_id,
356 reason: format!(
357 "recovered region has different region id {}",
358 region.region_id
359 ),
360 }
361 );
362
363 Ok(region)
364 }
365
366 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
367 match wal_options {
368 WalOptions::RaftEngine => {
369 ensure!(
370 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
371 error::IncompatibleWalProviderChangeSnafu {
372 global: "`kafka`",
373 region: "`raft_engine`",
374 }
375 );
376 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
377 }
378 WalOptions::Kafka(options) => {
379 ensure!(
380 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
381 error::IncompatibleWalProviderChangeSnafu {
382 global: "`raft_engine`",
383 region: "`kafka`",
384 }
385 );
386 Ok(Provider::kafka_provider(options.topic.clone()))
387 }
388 WalOptions::Noop => Ok(Provider::noop_provider()),
389 }
390 }
391
392 async fn maybe_open<S: LogStore>(
394 &mut self,
395 config: &MitoConfig,
396 wal: &Wal<S>,
397 ) -> Result<Option<MitoRegion>> {
398 let region_options = self.options.as_ref().unwrap().clone();
399
400 let region_manifest_options = Self::manifest_options(
401 config,
402 ®ion_options,
403 &self.region_dir(),
404 &self.object_store_manager,
405 )?;
406 let Some(manifest_manager) = RegionManifestManager::open(
407 region_manifest_options,
408 self.stats.total_manifest_size.clone(),
409 self.stats.manifest_version.clone(),
410 )
411 .await?
412 else {
413 return Ok(None);
414 };
415
416 let manifest = manifest_manager.manifest();
418 let metadata = if manifest.metadata.partition_expr.is_none()
419 && let Some(expr_json) = self.partition_expr_fetcher.fetch_expr(self.region_id).await
420 {
421 let metadata = manifest.metadata.as_ref().clone();
422 let mut builder = RegionMetadataBuilder::from_existing(metadata);
423 builder.partition_expr_json(Some(expr_json));
424 Arc::new(builder.build().context(InvalidMetadataSnafu)?)
425 } else {
426 manifest.metadata.clone()
427 };
428
429 let region_id = self.region_id;
430 let provider = self.provider::<S>(®ion_options.wal_options)?;
431 let wal_entry_reader = self
432 .wal_entry_reader
433 .take()
434 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
435 let on_region_opened = wal.on_region_opened();
436 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
437
438 debug!(
439 "Open region {} at {} with options: {:?}",
440 region_id, self.table_dir, self.options
441 );
442
443 let access_layer = Arc::new(AccessLayer::new(
444 self.table_dir.clone(),
445 self.path_type,
446 object_store,
447 self.puffin_manager_factory.clone(),
448 self.intermediate_manager.clone(),
449 ));
450 let file_purger = create_local_file_purger(
451 self.purge_scheduler.clone(),
452 access_layer.clone(),
453 self.cache_manager.clone(),
454 self.file_ref_manager.clone(),
455 );
456 let memtable_builder = self
457 .memtable_builder_provider
458 .builder_for_options(®ion_options);
459 let part_duration = region_options
462 .compaction
463 .time_window()
464 .or(manifest.compaction_time_window);
465 let mutable = Arc::new(TimePartitions::new(
467 metadata.clone(),
468 memtable_builder.clone(),
469 0,
470 part_duration,
471 ));
472 let version = VersionBuilder::new(metadata, mutable)
473 .add_files(file_purger.clone(), manifest.files.values().cloned())
474 .flushed_entry_id(manifest.flushed_entry_id)
475 .flushed_sequence(manifest.flushed_sequence)
476 .truncated_entry_id(manifest.truncated_entry_id)
477 .compaction_time_window(manifest.compaction_time_window)
478 .options(region_options)
479 .build();
480 let flushed_entry_id = version.flushed_entry_id;
481 let version_control = Arc::new(VersionControl::new(version));
482
483 let topic_latest_entry_id = if !self.skip_wal_replay {
484 let replay_from_entry_id = self
485 .replay_checkpoint
486 .unwrap_or_default()
487 .max(flushed_entry_id);
488 info!(
489 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
490 replay_from_entry_id, region_id, manifest.manifest_version, flushed_entry_id
491 );
492 replay_memtable(
493 &provider,
494 wal_entry_reader,
495 region_id,
496 replay_from_entry_id,
497 &version_control,
498 config.allow_stale_entries,
499 on_region_opened,
500 )
501 .await?;
502 if provider.is_remote_wal() && version_control.current().version.memtables.is_empty() {
506 wal.store().latest_entry_id(&provider).unwrap_or(0)
507 } else {
508 0
509 }
510 } else {
511 info!(
512 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
513 region_id, manifest.manifest_version, flushed_entry_id
514 );
515
516 0
517 };
518
519 if let Some(committed_in_manifest) = manifest.committed_sequence {
520 let committed_after_replay = version_control.committed_sequence();
521 if committed_in_manifest > committed_after_replay {
522 info!(
523 "Overriding committed sequence, region: {}, flushed_sequence: {}, committed_sequence: {} -> {}",
524 self.region_id,
525 version_control.current().version.flushed_sequence,
526 version_control.committed_sequence(),
527 committed_in_manifest
528 );
529 version_control.set_committed_sequence(committed_in_manifest);
530 }
531 }
532
533 let now = self.time_provider.current_time_millis();
534 let sst_format = manifest.sst_format;
536
537 let region = MitoRegion {
538 region_id: self.region_id,
539 version_control,
540 access_layer,
541 manifest_ctx: Arc::new(ManifestContext::new(
543 manifest_manager,
544 RegionRoleState::Follower,
545 )),
546 file_purger,
547 provider: provider.clone(),
548 last_flush_millis: AtomicI64::new(now),
549 last_compaction_millis: AtomicI64::new(now),
550 time_provider: self.time_provider.clone(),
551 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
552 written_bytes: Arc::new(AtomicU64::new(0)),
553 memtable_builder,
554 sst_format,
555 stats: self.stats.clone(),
556 };
557 Ok(Some(region))
558 }
559
560 fn manifest_options(
562 config: &MitoConfig,
563 options: &RegionOptions,
564 region_dir: &str,
565 object_store_manager: &ObjectStoreManagerRef,
566 ) -> Result<RegionManifestOptions> {
567 let object_store = get_object_store(&options.storage, object_store_manager)?;
568 Ok(RegionManifestOptions {
569 manifest_dir: new_manifest_dir(region_dir),
570 object_store,
571 compress_type: manifest_compress_type(config.compress_manifest),
574 checkpoint_distance: config.manifest_checkpoint_distance,
575 remove_file_options: RemoveFileOptions {
576 keep_count: config.experimental_manifest_keep_removed_file_count,
577 keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
578 },
579 })
580 }
581}
582
583pub fn get_object_store(
585 name: &Option<String>,
586 object_store_manager: &ObjectStoreManagerRef,
587) -> Result<object_store::ObjectStore> {
588 if let Some(name) = name {
589 Ok(object_store_manager
590 .find(name)
591 .with_context(|| ObjectStoreNotFoundSnafu {
592 object_store: name.clone(),
593 })?
594 .clone())
595 } else {
596 Ok(object_store_manager.default_object_store().clone())
597 }
598}
599
600pub struct RegionMetadataLoader {
602 config: Arc<MitoConfig>,
603 object_store_manager: ObjectStoreManagerRef,
604}
605
606impl RegionMetadataLoader {
607 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
609 Self {
610 config,
611 object_store_manager,
612 }
613 }
614
615 pub async fn load(
617 &self,
618 region_dir: &str,
619 region_options: &RegionOptions,
620 ) -> Result<Option<RegionMetadataRef>> {
621 let manifest = self.load_manifest(region_dir, region_options).await?;
622 Ok(manifest.map(|m| m.metadata.clone()))
623 }
624
625 pub async fn load_manifest(
627 &self,
628 region_dir: &str,
629 region_options: &RegionOptions,
630 ) -> Result<Option<Arc<RegionManifest>>> {
631 let region_manifest_options = RegionOpener::manifest_options(
632 &self.config,
633 region_options,
634 region_dir,
635 &self.object_store_manager,
636 )?;
637 let Some(manifest_manager) = RegionManifestManager::open(
638 region_manifest_options,
639 Arc::new(AtomicU64::new(0)),
640 Arc::new(AtomicU64::new(0)),
641 )
642 .await?
643 else {
644 return Ok(None);
645 };
646
647 let manifest = manifest_manager.manifest();
648 Ok(Some(manifest))
649 }
650}
651
652pub(crate) fn check_recovered_region(
654 recovered: &RegionMetadata,
655 region_id: RegionId,
656 column_metadatas: &[ColumnMetadata],
657 primary_key: &[ColumnId],
658) -> Result<()> {
659 if recovered.region_id != region_id {
660 error!(
661 "Recovered region {}, expect region {}",
662 recovered.region_id, region_id
663 );
664 return RegionCorruptedSnafu {
665 region_id,
666 reason: format!(
667 "recovered metadata has different region id {}",
668 recovered.region_id
669 ),
670 }
671 .fail();
672 }
673 if recovered.column_metadatas != column_metadatas {
674 error!(
675 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
676 recovered.region_id, recovered.column_metadatas, column_metadatas
677 );
678
679 return RegionCorruptedSnafu {
680 region_id,
681 reason: "recovered metadata has different schema",
682 }
683 .fail();
684 }
685 if recovered.primary_key != primary_key {
686 error!(
687 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
688 recovered.region_id, recovered.primary_key, primary_key
689 );
690
691 return RegionCorruptedSnafu {
692 region_id,
693 reason: "recovered metadata has different primary key",
694 }
695 .fail();
696 }
697
698 Ok(())
699}
700
701pub(crate) async fn replay_memtable<F>(
703 provider: &Provider,
704 mut wal_entry_reader: Box<dyn WalEntryReader>,
705 region_id: RegionId,
706 flushed_entry_id: EntryId,
707 version_control: &VersionControlRef,
708 allow_stale_entries: bool,
709 on_region_opened: F,
710) -> Result<EntryId>
711where
712 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
713{
714 let now = Instant::now();
715 let mut rows_replayed = 0;
716 let mut last_entry_id = flushed_entry_id;
719 let replay_from_entry_id = flushed_entry_id + 1;
720
721 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
722 while let Some(res) = wal_stream.next().await {
723 let (entry_id, entry) = res?;
724 if entry_id <= flushed_entry_id {
725 warn!(
726 "Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}",
727 region_id, flushed_entry_id, entry_id
728 );
729 ensure!(
730 allow_stale_entries,
731 StaleLogEntrySnafu {
732 region_id,
733 flushed_entry_id,
734 unexpected_entry_id: entry_id,
735 }
736 );
737 }
738 last_entry_id = last_entry_id.max(entry_id);
739
740 let mut region_write_ctx = RegionWriteCtx::new(
741 region_id,
742 version_control,
743 provider.clone(),
744 None,
746 );
747 for mutation in entry.mutations {
748 rows_replayed += mutation
749 .rows
750 .as_ref()
751 .map(|rows| rows.rows.len())
752 .unwrap_or(0);
753 region_write_ctx.push_mutation(
754 mutation.op_type,
755 mutation.rows,
756 mutation.write_hint,
757 OptionOutputTx::none(),
758 Some(mutation.sequence),
760 );
761 }
762
763 for bulk_entry in entry.bulk_entries {
764 let part = BulkPart::try_from(bulk_entry)?;
765 rows_replayed += part.num_rows();
766 let bulk_sequence_from_wal = part.sequence;
768 ensure!(
769 region_write_ctx.push_bulk(
770 OptionOutputTx::none(),
771 part,
772 Some(bulk_sequence_from_wal)
773 ),
774 RegionCorruptedSnafu {
775 region_id,
776 reason: "unable to replay memtable with bulk entries",
777 }
778 );
779 }
780
781 region_write_ctx.set_next_entry_id(last_entry_id + 1);
783 region_write_ctx.write_memtable().await;
784 region_write_ctx.write_bulk().await;
785 }
786
787 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
790
791 let series_count = version_control.current().series_count();
792 info!(
793 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
794 region_id,
795 provider,
796 rows_replayed,
797 replay_from_entry_id,
798 last_entry_id,
799 series_count,
800 now.elapsed()
801 );
802 Ok(last_entry_id)
803}
804
805pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
807 join_dir(region_dir, "manifest")
808}