1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, warn};
24use common_wal::options::WalOptions;
25use futures::future::BoxFuture;
26use futures::StreamExt;
27use log_store::kafka::log_store::KafkaLogStore;
28use log_store::raft_engine::log_store::RaftEngineLogStore;
29use object_store::manager::ObjectStoreManagerRef;
30use object_store::util::{join_dir, normalize_dir};
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::logstore::provider::Provider;
33use store_api::logstore::LogStore;
34use store_api::metadata::{
35 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
36};
37use store_api::region_engine::RegionRole;
38use store_api::region_request::PathType;
39use store_api::storage::{ColumnId, RegionId};
40
41use crate::access_layer::AccessLayer;
42use crate::cache::CacheManagerRef;
43use crate::config::MitoConfig;
44use crate::error;
45use crate::error::{
46 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
47 Result, StaleLogEntrySnafu,
48};
49use crate::manifest::action::RegionManifest;
50use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
51use crate::manifest::storage::manifest_compress_type;
52use crate::memtable::bulk::part::BulkPart;
53use crate::memtable::time_partition::TimePartitions;
54use crate::memtable::MemtableBuilderProvider;
55use crate::region::options::RegionOptions;
56use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
57use crate::region::{
58 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
59};
60use crate::region_write_ctx::RegionWriteCtx;
61use crate::request::OptionOutputTx;
62use crate::schedule::scheduler::SchedulerRef;
63use crate::sst::file_purger::create_local_file_purger;
64use crate::sst::file_ref::FileReferenceManagerRef;
65use crate::sst::index::intermediate::IntermediateManager;
66use crate::sst::index::puffin_manager::PuffinManagerFactory;
67use crate::sst::location::region_dir_from_table_dir;
68use crate::time_provider::TimeProviderRef;
69use crate::wal::entry_reader::WalEntryReader;
70use crate::wal::{EntryId, Wal};
71
72pub(crate) struct RegionOpener {
74 region_id: RegionId,
75 metadata_builder: Option<RegionMetadataBuilder>,
76 memtable_builder_provider: MemtableBuilderProvider,
77 object_store_manager: ObjectStoreManagerRef,
78 table_dir: String,
79 path_type: PathType,
80 purge_scheduler: SchedulerRef,
81 options: Option<RegionOptions>,
82 cache_manager: Option<CacheManagerRef>,
83 skip_wal_replay: bool,
84 puffin_manager_factory: PuffinManagerFactory,
85 intermediate_manager: IntermediateManager,
86 time_provider: TimeProviderRef,
87 stats: ManifestStats,
88 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
89 replay_checkpoint: Option<u64>,
90 file_ref_manager: FileReferenceManagerRef,
91}
92
93impl RegionOpener {
94 #[allow(clippy::too_many_arguments)]
97 pub(crate) fn new(
98 region_id: RegionId,
99 table_dir: &str,
100 path_type: PathType,
101 memtable_builder_provider: MemtableBuilderProvider,
102 object_store_manager: ObjectStoreManagerRef,
103 purge_scheduler: SchedulerRef,
104 puffin_manager_factory: PuffinManagerFactory,
105 intermediate_manager: IntermediateManager,
106 time_provider: TimeProviderRef,
107 file_ref_manager: FileReferenceManagerRef,
108 ) -> RegionOpener {
109 RegionOpener {
110 region_id,
111 metadata_builder: None,
112 memtable_builder_provider,
113 object_store_manager,
114 table_dir: normalize_dir(table_dir),
115 path_type,
116 purge_scheduler,
117 options: None,
118 cache_manager: None,
119 skip_wal_replay: false,
120 puffin_manager_factory,
121 intermediate_manager,
122 time_provider,
123 stats: Default::default(),
124 wal_entry_reader: None,
125 replay_checkpoint: None,
126 file_ref_manager,
127 }
128 }
129
130 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
132 self.metadata_builder = Some(builder);
133 self
134 }
135
136 fn region_dir(&self) -> String {
138 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
139 }
140
141 fn build_metadata(&mut self) -> Result<RegionMetadata> {
147 let options = self.options.as_ref().unwrap();
148 let mut metadata_builder = self.metadata_builder.take().unwrap();
149 metadata_builder.primary_key_encoding(options.primary_key_encoding());
150 metadata_builder.build().context(InvalidMetadataSnafu)
151 }
152
153 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
155 self.options(RegionOptions::try_from(&options)?)
156 }
157
158 pub(crate) fn replay_checkpoint(mut self, replay_checkpoint: Option<u64>) -> Self {
160 self.replay_checkpoint = replay_checkpoint;
161 self
162 }
163
164 pub(crate) fn wal_entry_reader(
167 mut self,
168 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
169 ) -> Self {
170 self.wal_entry_reader = wal_entry_reader;
171 self
172 }
173
174 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
176 options.validate()?;
177 self.options = Some(options);
178 Ok(self)
179 }
180
181 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
183 self.cache_manager = cache_manager;
184 self
185 }
186
187 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
189 self.skip_wal_replay = skip;
190 self
191 }
192
193 pub(crate) async fn create_or_open<S: LogStore>(
200 mut self,
201 config: &MitoConfig,
202 wal: &Wal<S>,
203 ) -> Result<MitoRegion> {
204 let region_id = self.region_id;
205 let region_dir = self.region_dir();
206 let metadata = self.build_metadata()?;
207 match self.maybe_open(config, wal).await {
209 Ok(Some(region)) => {
210 let recovered = region.metadata();
211 let expect = &metadata;
213 check_recovered_region(
214 &recovered,
215 expect.region_id,
216 &expect.column_metadatas,
217 &expect.primary_key,
218 )?;
219 region.set_role(RegionRole::Leader);
221
222 return Ok(region);
223 }
224 Ok(None) => {
225 debug!(
226 "No data under directory {}, region_id: {}",
227 region_dir, self.region_id
228 );
229 }
230 Err(e) => {
231 warn!(e;
232 "Failed to open region {} before creating it, region_dir: {}",
233 self.region_id, region_dir
234 );
235 }
236 }
237 let options = self.options.take().unwrap();
239 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
240 let provider = self.provider::<S>(&options.wal_options)?;
241 let metadata = Arc::new(metadata);
242 let region_manifest_options =
244 Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
245 let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
247 let manifest_manager = RegionManifestManager::new(
248 metadata.clone(),
249 flushed_entry_id,
250 region_manifest_options,
251 self.stats.total_manifest_size.clone(),
252 self.stats.manifest_version.clone(),
253 )
254 .await?;
255
256 let memtable_builder = self.memtable_builder_provider.builder_for_options(
257 options.memtable.as_ref(),
258 options.need_dedup(),
259 options.merge_mode(),
260 );
261 let part_duration = options.compaction.time_window();
262 let mutable = Arc::new(TimePartitions::new(
264 metadata.clone(),
265 memtable_builder.clone(),
266 0,
267 part_duration,
268 ));
269
270 debug!("Create region {} with options: {:?}", region_id, options);
271
272 let version = VersionBuilder::new(metadata, mutable)
273 .options(options)
274 .build();
275 let version_control = Arc::new(VersionControl::new(version));
276 let access_layer = Arc::new(AccessLayer::new(
277 self.table_dir.clone(),
278 self.path_type,
279 object_store,
280 self.puffin_manager_factory,
281 self.intermediate_manager,
282 ));
283 let now = self.time_provider.current_time_millis();
284
285 Ok(MitoRegion {
286 region_id,
287 version_control,
288 access_layer: access_layer.clone(),
289 manifest_ctx: Arc::new(ManifestContext::new(
291 manifest_manager,
292 RegionRoleState::Leader(RegionLeaderState::Writable),
293 )),
294 file_purger: create_local_file_purger(
295 self.purge_scheduler,
296 access_layer,
297 self.cache_manager,
298 self.file_ref_manager.clone(),
299 ),
300 provider,
301 last_flush_millis: AtomicI64::new(now),
302 last_compaction_millis: AtomicI64::new(now),
303 time_provider: self.time_provider.clone(),
304 topic_latest_entry_id: AtomicU64::new(0),
305 memtable_builder,
306 written_bytes: Arc::new(AtomicU64::new(0)),
307 stats: self.stats,
308 })
309 }
310
311 pub(crate) async fn open<S: LogStore>(
315 mut self,
316 config: &MitoConfig,
317 wal: &Wal<S>,
318 ) -> Result<MitoRegion> {
319 let region_id = self.region_id;
320 let region_dir = self.region_dir();
321 let region = self
322 .maybe_open(config, wal)
323 .await?
324 .context(EmptyRegionDirSnafu {
325 region_id,
326 region_dir: ®ion_dir,
327 })?;
328
329 ensure!(
330 region.region_id == self.region_id,
331 RegionCorruptedSnafu {
332 region_id: self.region_id,
333 reason: format!(
334 "recovered region has different region id {}",
335 region.region_id
336 ),
337 }
338 );
339
340 Ok(region)
341 }
342
343 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
344 match wal_options {
345 WalOptions::RaftEngine => {
346 ensure!(
347 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
348 error::IncompatibleWalProviderChangeSnafu {
349 global: "`kafka`",
350 region: "`raft_engine`",
351 }
352 );
353 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
354 }
355 WalOptions::Kafka(options) => {
356 ensure!(
357 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
358 error::IncompatibleWalProviderChangeSnafu {
359 global: "`raft_engine`",
360 region: "`kafka`",
361 }
362 );
363 Ok(Provider::kafka_provider(options.topic.to_string()))
364 }
365 WalOptions::Noop => Ok(Provider::noop_provider()),
366 }
367 }
368
369 async fn maybe_open<S: LogStore>(
371 &mut self,
372 config: &MitoConfig,
373 wal: &Wal<S>,
374 ) -> Result<Option<MitoRegion>> {
375 let region_options = self.options.as_ref().unwrap().clone();
376
377 let region_manifest_options = Self::manifest_options(
378 config,
379 ®ion_options,
380 &self.region_dir(),
381 &self.object_store_manager,
382 )?;
383 let Some(manifest_manager) = RegionManifestManager::open(
384 region_manifest_options,
385 self.stats.total_manifest_size.clone(),
386 self.stats.manifest_version.clone(),
387 )
388 .await?
389 else {
390 return Ok(None);
391 };
392
393 let manifest = manifest_manager.manifest();
394 let metadata = manifest.metadata.clone();
395
396 let region_id = self.region_id;
397 let provider = self.provider::<S>(®ion_options.wal_options)?;
398 let wal_entry_reader = self
399 .wal_entry_reader
400 .take()
401 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
402 let on_region_opened = wal.on_region_opened();
403 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
404
405 debug!(
406 "Open region {} at {} with options: {:?}",
407 region_id, self.table_dir, self.options
408 );
409
410 let access_layer = Arc::new(AccessLayer::new(
411 self.table_dir.clone(),
412 self.path_type,
413 object_store,
414 self.puffin_manager_factory.clone(),
415 self.intermediate_manager.clone(),
416 ));
417 let file_purger = create_local_file_purger(
418 self.purge_scheduler.clone(),
419 access_layer.clone(),
420 self.cache_manager.clone(),
421 self.file_ref_manager.clone(),
422 );
423 let memtable_builder = self.memtable_builder_provider.builder_for_options(
424 region_options.memtable.as_ref(),
425 region_options.need_dedup(),
426 region_options.merge_mode(),
427 );
428 let part_duration = region_options
431 .compaction
432 .time_window()
433 .or(manifest.compaction_time_window);
434 let mutable = Arc::new(TimePartitions::new(
436 metadata.clone(),
437 memtable_builder.clone(),
438 0,
439 part_duration,
440 ));
441 let version = VersionBuilder::new(metadata, mutable)
442 .add_files(file_purger.clone(), manifest.files.values().cloned())
443 .flushed_entry_id(manifest.flushed_entry_id)
444 .flushed_sequence(manifest.flushed_sequence)
445 .truncated_entry_id(manifest.truncated_entry_id)
446 .compaction_time_window(manifest.compaction_time_window)
447 .options(region_options)
448 .build();
449 let flushed_entry_id = version.flushed_entry_id;
450 let version_control = Arc::new(VersionControl::new(version));
451 let topic_latest_entry_id = if !self.skip_wal_replay {
452 let replay_from_entry_id = self
453 .replay_checkpoint
454 .unwrap_or_default()
455 .max(flushed_entry_id);
456 info!(
457 "Start replaying memtable at replay_from_entry_id: {} for region {}, manifest version: {}, flushed entry id: {}",
458 replay_from_entry_id,
459 region_id,
460 manifest.manifest_version,
461 flushed_entry_id
462 );
463 replay_memtable(
464 &provider,
465 wal_entry_reader,
466 region_id,
467 replay_from_entry_id,
468 &version_control,
469 config.allow_stale_entries,
470 on_region_opened,
471 )
472 .await?;
473 let topic_latest_entry_id = if provider.is_remote_wal()
476 && version_control.current().version.memtables.is_empty()
477 {
478 wal.store().latest_entry_id(&provider).unwrap_or(0)
479 } else {
480 0
481 };
482
483 topic_latest_entry_id
484 } else {
485 info!(
486 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
487 region_id, manifest.manifest_version, flushed_entry_id
488 );
489
490 0
491 };
492 let now = self.time_provider.current_time_millis();
493 let region = MitoRegion {
494 region_id: self.region_id,
495 version_control,
496 access_layer,
497 manifest_ctx: Arc::new(ManifestContext::new(
499 manifest_manager,
500 RegionRoleState::Follower,
501 )),
502 file_purger,
503 provider: provider.clone(),
504 last_flush_millis: AtomicI64::new(now),
505 last_compaction_millis: AtomicI64::new(now),
506 time_provider: self.time_provider.clone(),
507 topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
508 written_bytes: Arc::new(AtomicU64::new(0)),
509 memtable_builder,
510 stats: self.stats.clone(),
511 };
512 Ok(Some(region))
513 }
514
515 fn manifest_options(
517 config: &MitoConfig,
518 options: &RegionOptions,
519 region_dir: &str,
520 object_store_manager: &ObjectStoreManagerRef,
521 ) -> Result<RegionManifestOptions> {
522 let object_store = get_object_store(&options.storage, object_store_manager)?;
523 Ok(RegionManifestOptions {
524 manifest_dir: new_manifest_dir(region_dir),
525 object_store,
526 compress_type: manifest_compress_type(config.compress_manifest),
529 checkpoint_distance: config.manifest_checkpoint_distance,
530 remove_file_options: RemoveFileOptions {
531 keep_count: config.experimental_manifest_keep_removed_file_count,
532 keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
533 },
534 })
535 }
536}
537
538pub fn get_object_store(
540 name: &Option<String>,
541 object_store_manager: &ObjectStoreManagerRef,
542) -> Result<object_store::ObjectStore> {
543 if let Some(name) = name {
544 Ok(object_store_manager
545 .find(name)
546 .with_context(|| ObjectStoreNotFoundSnafu {
547 object_store: name.to_string(),
548 })?
549 .clone())
550 } else {
551 Ok(object_store_manager.default_object_store().clone())
552 }
553}
554
555pub struct RegionMetadataLoader {
557 config: Arc<MitoConfig>,
558 object_store_manager: ObjectStoreManagerRef,
559}
560
561impl RegionMetadataLoader {
562 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
564 Self {
565 config,
566 object_store_manager,
567 }
568 }
569
570 pub async fn load(
572 &self,
573 region_dir: &str,
574 region_options: &RegionOptions,
575 ) -> Result<Option<RegionMetadataRef>> {
576 let manifest = self.load_manifest(region_dir, region_options).await?;
577 Ok(manifest.map(|m| m.metadata.clone()))
578 }
579
580 pub async fn load_manifest(
582 &self,
583 region_dir: &str,
584 region_options: &RegionOptions,
585 ) -> Result<Option<Arc<RegionManifest>>> {
586 let region_manifest_options = RegionOpener::manifest_options(
587 &self.config,
588 region_options,
589 region_dir,
590 &self.object_store_manager,
591 )?;
592 let Some(manifest_manager) = RegionManifestManager::open(
593 region_manifest_options,
594 Arc::new(AtomicU64::new(0)),
595 Arc::new(AtomicU64::new(0)),
596 )
597 .await?
598 else {
599 return Ok(None);
600 };
601
602 let manifest = manifest_manager.manifest();
603 Ok(Some(manifest))
604 }
605}
606
607pub(crate) fn check_recovered_region(
609 recovered: &RegionMetadata,
610 region_id: RegionId,
611 column_metadatas: &[ColumnMetadata],
612 primary_key: &[ColumnId],
613) -> Result<()> {
614 if recovered.region_id != region_id {
615 error!(
616 "Recovered region {}, expect region {}",
617 recovered.region_id, region_id
618 );
619 return RegionCorruptedSnafu {
620 region_id,
621 reason: format!(
622 "recovered metadata has different region id {}",
623 recovered.region_id
624 ),
625 }
626 .fail();
627 }
628 if recovered.column_metadatas != column_metadatas {
629 error!(
630 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
631 recovered.region_id, recovered.column_metadatas, column_metadatas
632 );
633
634 return RegionCorruptedSnafu {
635 region_id,
636 reason: "recovered metadata has different schema",
637 }
638 .fail();
639 }
640 if recovered.primary_key != primary_key {
641 error!(
642 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
643 recovered.region_id, recovered.primary_key, primary_key
644 );
645
646 return RegionCorruptedSnafu {
647 region_id,
648 reason: "recovered metadata has different primary key",
649 }
650 .fail();
651 }
652
653 Ok(())
654}
655
656pub(crate) async fn replay_memtable<F>(
658 provider: &Provider,
659 mut wal_entry_reader: Box<dyn WalEntryReader>,
660 region_id: RegionId,
661 flushed_entry_id: EntryId,
662 version_control: &VersionControlRef,
663 allow_stale_entries: bool,
664 on_region_opened: F,
665) -> Result<EntryId>
666where
667 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
668{
669 let now = Instant::now();
670 let mut rows_replayed = 0;
671 let mut last_entry_id = flushed_entry_id;
674 let replay_from_entry_id = flushed_entry_id + 1;
675
676 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
677 while let Some(res) = wal_stream.next().await {
678 let (entry_id, entry) = res?;
679 if entry_id <= flushed_entry_id {
680 warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
681 ensure!(
682 allow_stale_entries,
683 StaleLogEntrySnafu {
684 region_id,
685 flushed_entry_id,
686 unexpected_entry_id: entry_id,
687 }
688 );
689 }
690 last_entry_id = last_entry_id.max(entry_id);
691
692 let mut region_write_ctx = RegionWriteCtx::new(
693 region_id,
694 version_control,
695 provider.clone(),
696 None,
698 );
699 for mutation in entry.mutations {
700 rows_replayed += mutation
701 .rows
702 .as_ref()
703 .map(|rows| rows.rows.len())
704 .unwrap_or(0);
705 region_write_ctx.push_mutation(
706 mutation.op_type,
707 mutation.rows,
708 mutation.write_hint,
709 OptionOutputTx::none(),
710 );
711 }
712
713 for bulk_entry in entry.bulk_entries {
714 let part = BulkPart::try_from(bulk_entry)?;
715 rows_replayed += part.num_rows();
716 ensure!(
717 region_write_ctx.push_bulk(OptionOutputTx::none(), part),
718 RegionCorruptedSnafu {
719 region_id,
720 reason: "unable to replay memtable with bulk entries",
721 }
722 );
723 }
724
725 region_write_ctx.set_next_entry_id(last_entry_id + 1);
727 region_write_ctx.write_memtable().await;
728 region_write_ctx.write_bulk().await;
729 }
730
731 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
734
735 let series_count = version_control.current().series_count();
736 info!(
737 "Replay WAL for region: {}, provider: {:?}, rows recovered: {}, replay from entry id: {}, last entry id: {}, total timeseries replayed: {}, elapsed: {:?}",
738 region_id, provider, rows_replayed, replay_from_entry_id, last_entry_id, series_count, now.elapsed()
739 );
740 Ok(last_entry_id)
741}
742
743pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
745 join_dir(region_dir, "manifest")
746}