1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, warn};
23use common_wal::options::WalOptions;
24use futures::future::BoxFuture;
25use futures::StreamExt;
26use log_store::kafka::log_store::KafkaLogStore;
27use log_store::raft_engine::log_store::RaftEngineLogStore;
28use object_store::manager::ObjectStoreManagerRef;
29use object_store::util::{join_dir, normalize_dir};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::logstore::provider::Provider;
32use store_api::logstore::LogStore;
33use store_api::metadata::{
34 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
35};
36use store_api::region_engine::RegionRole;
37use store_api::storage::{ColumnId, RegionId};
38
39use crate::access_layer::AccessLayer;
40use crate::cache::CacheManagerRef;
41use crate::config::MitoConfig;
42use crate::error;
43use crate::error::{
44 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
45 Result, StaleLogEntrySnafu,
46};
47use crate::manifest::action::RegionManifest;
48use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
49use crate::manifest::storage::manifest_compress_type;
50use crate::memtable::time_partition::TimePartitions;
51use crate::memtable::MemtableBuilderProvider;
52use crate::region::options::RegionOptions;
53use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
54use crate::region::{
55 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
56};
57use crate::region_write_ctx::RegionWriteCtx;
58use crate::request::OptionOutputTx;
59use crate::schedule::scheduler::SchedulerRef;
60use crate::sst::file_purger::LocalFilePurger;
61use crate::sst::index::intermediate::IntermediateManager;
62use crate::sst::index::puffin_manager::PuffinManagerFactory;
63use crate::time_provider::TimeProviderRef;
64use crate::wal::entry_reader::WalEntryReader;
65use crate::wal::{EntryId, Wal};
66
67pub(crate) struct RegionOpener {
69 region_id: RegionId,
70 metadata_builder: Option<RegionMetadataBuilder>,
71 memtable_builder_provider: MemtableBuilderProvider,
72 object_store_manager: ObjectStoreManagerRef,
73 region_dir: String,
74 purge_scheduler: SchedulerRef,
75 options: Option<RegionOptions>,
76 cache_manager: Option<CacheManagerRef>,
77 skip_wal_replay: bool,
78 puffin_manager_factory: PuffinManagerFactory,
79 intermediate_manager: IntermediateManager,
80 time_provider: TimeProviderRef,
81 stats: ManifestStats,
82 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
83}
84
85impl RegionOpener {
86 #[allow(clippy::too_many_arguments)]
89 pub(crate) fn new(
90 region_id: RegionId,
91 region_dir: &str,
92 memtable_builder_provider: MemtableBuilderProvider,
93 object_store_manager: ObjectStoreManagerRef,
94 purge_scheduler: SchedulerRef,
95 puffin_manager_factory: PuffinManagerFactory,
96 intermediate_manager: IntermediateManager,
97 time_provider: TimeProviderRef,
98 ) -> RegionOpener {
99 RegionOpener {
100 region_id,
101 metadata_builder: None,
102 memtable_builder_provider,
103 object_store_manager,
104 region_dir: normalize_dir(region_dir),
105 purge_scheduler,
106 options: None,
107 cache_manager: None,
108 skip_wal_replay: false,
109 puffin_manager_factory,
110 intermediate_manager,
111 time_provider,
112 stats: Default::default(),
113 wal_entry_reader: None,
114 }
115 }
116
117 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
119 self.metadata_builder = Some(builder);
120 self
121 }
122
123 fn build_metadata(&mut self) -> Result<RegionMetadata> {
129 let options = self.options.as_ref().unwrap();
130 let mut metadata_builder = self.metadata_builder.take().unwrap();
131 metadata_builder.primary_key_encoding(options.primary_key_encoding());
132 metadata_builder.build().context(InvalidMetadataSnafu)
133 }
134
135 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
137 self.options(RegionOptions::try_from(&options)?)
138 }
139
140 pub(crate) fn wal_entry_reader(
143 mut self,
144 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
145 ) -> Self {
146 self.wal_entry_reader = wal_entry_reader;
147 self
148 }
149
150 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
152 options.validate()?;
153 self.options = Some(options);
154 Ok(self)
155 }
156
157 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
159 self.cache_manager = cache_manager;
160 self
161 }
162
163 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
165 self.skip_wal_replay = skip;
166 self
167 }
168
169 pub(crate) async fn create_or_open<S: LogStore>(
176 mut self,
177 config: &MitoConfig,
178 wal: &Wal<S>,
179 ) -> Result<MitoRegion> {
180 let region_id = self.region_id;
181 let metadata = self.build_metadata()?;
182 match self.maybe_open(config, wal).await {
184 Ok(Some(region)) => {
185 let recovered = region.metadata();
186 let expect = &metadata;
188 check_recovered_region(
189 &recovered,
190 expect.region_id,
191 &expect.column_metadatas,
192 &expect.primary_key,
193 )?;
194 region.set_role(RegionRole::Leader);
196 return Ok(region);
197 }
198 Ok(None) => {
199 debug!(
200 "No data under directory {}, region_id: {}",
201 self.region_dir, self.region_id
202 );
203 }
204 Err(e) => {
205 warn!(e;
206 "Failed to open region {} before creating it, region_dir: {}",
207 self.region_id, self.region_dir
208 );
209 }
210 }
211 let options = self.options.take().unwrap();
213 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
214 let provider = self.provider::<S>(&options.wal_options)?;
215 let metadata = Arc::new(metadata);
216 let region_manifest_options = Self::manifest_options(
218 config,
219 &options,
220 &self.region_dir,
221 &self.object_store_manager,
222 )?;
223 let manifest_manager = RegionManifestManager::new(
224 metadata.clone(),
225 region_manifest_options,
226 self.stats.total_manifest_size.clone(),
227 self.stats.manifest_version.clone(),
228 )
229 .await?;
230
231 let memtable_builder = self.memtable_builder_provider.builder_for_options(
232 options.memtable.as_ref(),
233 options.need_dedup(),
234 options.merge_mode(),
235 );
236 let part_duration = options.compaction.time_window();
237 let mutable = Arc::new(TimePartitions::new(
239 metadata.clone(),
240 memtable_builder.clone(),
241 0,
242 part_duration,
243 ));
244
245 debug!("Create region {} with options: {:?}", region_id, options);
246
247 let version = VersionBuilder::new(metadata, mutable)
248 .options(options)
249 .build();
250 let version_control = Arc::new(VersionControl::new(version));
251 let access_layer = Arc::new(AccessLayer::new(
252 self.region_dir,
253 object_store,
254 self.puffin_manager_factory,
255 self.intermediate_manager,
256 ));
257 let now = self.time_provider.current_time_millis();
258
259 Ok(MitoRegion {
260 region_id,
261 version_control,
262 access_layer: access_layer.clone(),
263 manifest_ctx: Arc::new(ManifestContext::new(
265 manifest_manager,
266 RegionRoleState::Leader(RegionLeaderState::Writable),
267 )),
268 file_purger: Arc::new(LocalFilePurger::new(
269 self.purge_scheduler,
270 access_layer,
271 self.cache_manager,
272 )),
273 provider,
274 last_flush_millis: AtomicI64::new(now),
275 last_compaction_millis: AtomicI64::new(now),
276 time_provider: self.time_provider.clone(),
277 topic_latest_entry_id: AtomicU64::new(0),
278 memtable_builder,
279 stats: self.stats,
280 })
281 }
282
283 pub(crate) async fn open<S: LogStore>(
287 mut self,
288 config: &MitoConfig,
289 wal: &Wal<S>,
290 ) -> Result<MitoRegion> {
291 let region_id = self.region_id;
292 let region = self
293 .maybe_open(config, wal)
294 .await?
295 .context(EmptyRegionDirSnafu {
296 region_id,
297 region_dir: self.region_dir,
298 })?;
299
300 ensure!(
301 region.region_id == self.region_id,
302 RegionCorruptedSnafu {
303 region_id: self.region_id,
304 reason: format!(
305 "recovered region has different region id {}",
306 region.region_id
307 ),
308 }
309 );
310
311 Ok(region)
312 }
313
314 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
315 match wal_options {
316 WalOptions::RaftEngine => {
317 ensure!(
318 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
319 error::IncompatibleWalProviderChangeSnafu {
320 global: "`kafka`",
321 region: "`raft_engine`",
322 }
323 );
324 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
325 }
326 WalOptions::Kafka(options) => {
327 ensure!(
328 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
329 error::IncompatibleWalProviderChangeSnafu {
330 global: "`raft_engine`",
331 region: "`kafka`",
332 }
333 );
334 Ok(Provider::kafka_provider(options.topic.to_string()))
335 }
336 WalOptions::Noop => Ok(Provider::noop_provider()),
337 }
338 }
339
340 async fn maybe_open<S: LogStore>(
342 &mut self,
343 config: &MitoConfig,
344 wal: &Wal<S>,
345 ) -> Result<Option<MitoRegion>> {
346 let region_options = self.options.as_ref().unwrap().clone();
347
348 let region_manifest_options = Self::manifest_options(
349 config,
350 ®ion_options,
351 &self.region_dir,
352 &self.object_store_manager,
353 )?;
354 let Some(manifest_manager) = RegionManifestManager::open(
355 region_manifest_options,
356 self.stats.total_manifest_size.clone(),
357 self.stats.manifest_version.clone(),
358 )
359 .await?
360 else {
361 return Ok(None);
362 };
363
364 let manifest = manifest_manager.manifest();
365 let metadata = manifest.metadata.clone();
366
367 let region_id = self.region_id;
368 let provider = self.provider::<S>(®ion_options.wal_options)?;
369 let wal_entry_reader = self
370 .wal_entry_reader
371 .take()
372 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
373 let on_region_opened = wal.on_region_opened();
374 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
375
376 debug!("Open region {} with options: {:?}", region_id, self.options);
377
378 let access_layer = Arc::new(AccessLayer::new(
379 self.region_dir.clone(),
380 object_store,
381 self.puffin_manager_factory.clone(),
382 self.intermediate_manager.clone(),
383 ));
384 let file_purger = Arc::new(LocalFilePurger::new(
385 self.purge_scheduler.clone(),
386 access_layer.clone(),
387 self.cache_manager.clone(),
388 ));
389 let memtable_builder = self.memtable_builder_provider.builder_for_options(
390 region_options.memtable.as_ref(),
391 region_options.need_dedup(),
392 region_options.merge_mode(),
393 );
394 let part_duration = region_options
397 .compaction
398 .time_window()
399 .or(manifest.compaction_time_window);
400 let mutable = Arc::new(TimePartitions::new(
402 metadata.clone(),
403 memtable_builder.clone(),
404 0,
405 part_duration,
406 ));
407 let version = VersionBuilder::new(metadata, mutable)
408 .add_files(file_purger.clone(), manifest.files.values().cloned())
409 .flushed_entry_id(manifest.flushed_entry_id)
410 .flushed_sequence(manifest.flushed_sequence)
411 .truncated_entry_id(manifest.truncated_entry_id)
412 .compaction_time_window(manifest.compaction_time_window)
413 .options(region_options)
414 .build();
415 let flushed_entry_id = version.flushed_entry_id;
416 let version_control = Arc::new(VersionControl::new(version));
417 if !self.skip_wal_replay {
418 info!(
419 "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
420 flushed_entry_id + 1,
421 region_id,
422 manifest.manifest_version
423 );
424 replay_memtable(
425 &provider,
426 wal_entry_reader,
427 region_id,
428 flushed_entry_id,
429 &version_control,
430 config.allow_stale_entries,
431 on_region_opened,
432 )
433 .await?;
434 } else {
435 info!(
436 "Skip the WAL replay for region: {}, manifest version: {}",
437 region_id, manifest.manifest_version
438 );
439 }
440 let now = self.time_provider.current_time_millis();
441
442 let region = MitoRegion {
443 region_id: self.region_id,
444 version_control,
445 access_layer,
446 manifest_ctx: Arc::new(ManifestContext::new(
448 manifest_manager,
449 RegionRoleState::Follower,
450 )),
451 file_purger,
452 provider: provider.clone(),
453 last_flush_millis: AtomicI64::new(now),
454 last_compaction_millis: AtomicI64::new(now),
455 time_provider: self.time_provider.clone(),
456 topic_latest_entry_id: AtomicU64::new(0),
457 memtable_builder,
458 stats: self.stats.clone(),
459 };
460 Ok(Some(region))
461 }
462
463 fn manifest_options(
465 config: &MitoConfig,
466 options: &RegionOptions,
467 region_dir: &str,
468 object_store_manager: &ObjectStoreManagerRef,
469 ) -> Result<RegionManifestOptions> {
470 let object_store = get_object_store(&options.storage, object_store_manager)?;
471 Ok(RegionManifestOptions {
472 manifest_dir: new_manifest_dir(region_dir),
473 object_store,
474 compress_type: manifest_compress_type(config.compress_manifest),
477 checkpoint_distance: config.manifest_checkpoint_distance,
478 })
479 }
480}
481
482pub fn get_object_store(
484 name: &Option<String>,
485 object_store_manager: &ObjectStoreManagerRef,
486) -> Result<object_store::ObjectStore> {
487 if let Some(name) = name {
488 Ok(object_store_manager
489 .find(name)
490 .with_context(|| ObjectStoreNotFoundSnafu {
491 object_store: name.to_string(),
492 })?
493 .clone())
494 } else {
495 Ok(object_store_manager.default_object_store().clone())
496 }
497}
498
499pub struct RegionMetadataLoader {
501 config: Arc<MitoConfig>,
502 object_store_manager: ObjectStoreManagerRef,
503}
504
505impl RegionMetadataLoader {
506 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
508 Self {
509 config,
510 object_store_manager,
511 }
512 }
513
514 pub async fn load(
516 &self,
517 region_dir: &str,
518 region_options: &RegionOptions,
519 ) -> Result<Option<RegionMetadataRef>> {
520 let manifest = self.load_manifest(region_dir, region_options).await?;
521 Ok(manifest.map(|m| m.metadata.clone()))
522 }
523
524 pub async fn load_manifest(
526 &self,
527 region_dir: &str,
528 region_options: &RegionOptions,
529 ) -> Result<Option<Arc<RegionManifest>>> {
530 let region_manifest_options = RegionOpener::manifest_options(
531 &self.config,
532 region_options,
533 region_dir,
534 &self.object_store_manager,
535 )?;
536 let Some(manifest_manager) = RegionManifestManager::open(
537 region_manifest_options,
538 Arc::new(AtomicU64::new(0)),
539 Arc::new(AtomicU64::new(0)),
540 )
541 .await?
542 else {
543 return Ok(None);
544 };
545
546 let manifest = manifest_manager.manifest();
547 Ok(Some(manifest))
548 }
549}
550
551pub(crate) fn check_recovered_region(
553 recovered: &RegionMetadata,
554 region_id: RegionId,
555 column_metadatas: &[ColumnMetadata],
556 primary_key: &[ColumnId],
557) -> Result<()> {
558 if recovered.region_id != region_id {
559 error!(
560 "Recovered region {}, expect region {}",
561 recovered.region_id, region_id
562 );
563 return RegionCorruptedSnafu {
564 region_id,
565 reason: format!(
566 "recovered metadata has different region id {}",
567 recovered.region_id
568 ),
569 }
570 .fail();
571 }
572 if recovered.column_metadatas != column_metadatas {
573 error!(
574 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
575 recovered.region_id, recovered.column_metadatas, column_metadatas
576 );
577
578 return RegionCorruptedSnafu {
579 region_id,
580 reason: "recovered metadata has different schema",
581 }
582 .fail();
583 }
584 if recovered.primary_key != primary_key {
585 error!(
586 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
587 recovered.region_id, recovered.primary_key, primary_key
588 );
589
590 return RegionCorruptedSnafu {
591 region_id,
592 reason: "recovered metadata has different primary key",
593 }
594 .fail();
595 }
596
597 Ok(())
598}
599
600pub(crate) async fn replay_memtable<F>(
602 provider: &Provider,
603 mut wal_entry_reader: Box<dyn WalEntryReader>,
604 region_id: RegionId,
605 flushed_entry_id: EntryId,
606 version_control: &VersionControlRef,
607 allow_stale_entries: bool,
608 on_region_opened: F,
609) -> Result<EntryId>
610where
611 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
612{
613 let mut rows_replayed = 0;
614 let mut last_entry_id = flushed_entry_id;
617 let replay_from_entry_id = flushed_entry_id + 1;
618
619 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
620 while let Some(res) = wal_stream.next().await {
621 let (entry_id, entry) = res?;
622 if entry_id <= flushed_entry_id {
623 warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
624 ensure!(
625 allow_stale_entries,
626 StaleLogEntrySnafu {
627 region_id,
628 flushed_entry_id,
629 unexpected_entry_id: entry_id,
630 }
631 );
632 }
633 last_entry_id = last_entry_id.max(entry_id);
634
635 let mut region_write_ctx =
636 RegionWriteCtx::new(region_id, version_control, provider.clone());
637 for mutation in entry.mutations {
638 rows_replayed += mutation
639 .rows
640 .as_ref()
641 .map(|rows| rows.rows.len())
642 .unwrap_or(0);
643 region_write_ctx.push_mutation(
644 mutation.op_type,
645 mutation.rows,
646 mutation.write_hint,
647 OptionOutputTx::none(),
648 );
649 }
650
651 region_write_ctx.set_next_entry_id(last_entry_id + 1);
653 region_write_ctx.write_memtable().await;
654 }
655
656 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
659
660 info!(
661 "Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
662 region_id, rows_replayed, last_entry_id
663 );
664 Ok(last_entry_id)
665}
666
667pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
669 join_dir(region_dir, "manifest")
670}