1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, warn};
23use common_wal::options::WalOptions;
24use futures::future::BoxFuture;
25use futures::StreamExt;
26use log_store::kafka::log_store::KafkaLogStore;
27use log_store::raft_engine::log_store::RaftEngineLogStore;
28use object_store::manager::ObjectStoreManagerRef;
29use object_store::util::{join_dir, normalize_dir};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::logstore::provider::Provider;
32use store_api::logstore::LogStore;
33use store_api::metadata::{
34 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
35};
36use store_api::region_engine::RegionRole;
37use store_api::storage::{ColumnId, RegionId};
38
39use crate::access_layer::AccessLayer;
40use crate::cache::CacheManagerRef;
41use crate::config::MitoConfig;
42use crate::error;
43use crate::error::{
44 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
45 Result, StaleLogEntrySnafu,
46};
47use crate::manifest::action::RegionManifest;
48use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
49use crate::manifest::storage::manifest_compress_type;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::time_partition::TimePartitions;
52use crate::memtable::MemtableBuilderProvider;
53use crate::region::options::RegionOptions;
54use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
55use crate::region::{
56 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
57};
58use crate::region_write_ctx::RegionWriteCtx;
59use crate::request::OptionOutputTx;
60use crate::schedule::scheduler::SchedulerRef;
61use crate::sst::file_purger::LocalFilePurger;
62use crate::sst::index::intermediate::IntermediateManager;
63use crate::sst::index::puffin_manager::PuffinManagerFactory;
64use crate::time_provider::TimeProviderRef;
65use crate::wal::entry_reader::WalEntryReader;
66use crate::wal::{EntryId, Wal};
67
68pub(crate) struct RegionOpener {
70 region_id: RegionId,
71 metadata_builder: Option<RegionMetadataBuilder>,
72 memtable_builder_provider: MemtableBuilderProvider,
73 object_store_manager: ObjectStoreManagerRef,
74 region_dir: String,
75 purge_scheduler: SchedulerRef,
76 options: Option<RegionOptions>,
77 cache_manager: Option<CacheManagerRef>,
78 skip_wal_replay: bool,
79 puffin_manager_factory: PuffinManagerFactory,
80 intermediate_manager: IntermediateManager,
81 time_provider: TimeProviderRef,
82 stats: ManifestStats,
83 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
84}
85
86impl RegionOpener {
87 #[allow(clippy::too_many_arguments)]
90 pub(crate) fn new(
91 region_id: RegionId,
92 region_dir: &str,
93 memtable_builder_provider: MemtableBuilderProvider,
94 object_store_manager: ObjectStoreManagerRef,
95 purge_scheduler: SchedulerRef,
96 puffin_manager_factory: PuffinManagerFactory,
97 intermediate_manager: IntermediateManager,
98 time_provider: TimeProviderRef,
99 ) -> RegionOpener {
100 RegionOpener {
101 region_id,
102 metadata_builder: None,
103 memtable_builder_provider,
104 object_store_manager,
105 region_dir: normalize_dir(region_dir),
106 purge_scheduler,
107 options: None,
108 cache_manager: None,
109 skip_wal_replay: false,
110 puffin_manager_factory,
111 intermediate_manager,
112 time_provider,
113 stats: Default::default(),
114 wal_entry_reader: None,
115 }
116 }
117
118 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
120 self.metadata_builder = Some(builder);
121 self
122 }
123
124 fn build_metadata(&mut self) -> Result<RegionMetadata> {
130 let options = self.options.as_ref().unwrap();
131 let mut metadata_builder = self.metadata_builder.take().unwrap();
132 metadata_builder.primary_key_encoding(options.primary_key_encoding());
133 metadata_builder.build().context(InvalidMetadataSnafu)
134 }
135
136 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
138 self.options(RegionOptions::try_from(&options)?)
139 }
140
141 pub(crate) fn wal_entry_reader(
144 mut self,
145 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
146 ) -> Self {
147 self.wal_entry_reader = wal_entry_reader;
148 self
149 }
150
151 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
153 options.validate()?;
154 self.options = Some(options);
155 Ok(self)
156 }
157
158 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
160 self.cache_manager = cache_manager;
161 self
162 }
163
164 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
166 self.skip_wal_replay = skip;
167 self
168 }
169
170 pub(crate) async fn create_or_open<S: LogStore>(
177 mut self,
178 config: &MitoConfig,
179 wal: &Wal<S>,
180 ) -> Result<MitoRegion> {
181 let region_id = self.region_id;
182 let metadata = self.build_metadata()?;
183 match self.maybe_open(config, wal).await {
185 Ok(Some(region)) => {
186 let recovered = region.metadata();
187 let expect = &metadata;
189 check_recovered_region(
190 &recovered,
191 expect.region_id,
192 &expect.column_metadatas,
193 &expect.primary_key,
194 )?;
195 region.set_role(RegionRole::Leader);
197 return Ok(region);
198 }
199 Ok(None) => {
200 debug!(
201 "No data under directory {}, region_id: {}",
202 self.region_dir, self.region_id
203 );
204 }
205 Err(e) => {
206 warn!(e;
207 "Failed to open region {} before creating it, region_dir: {}",
208 self.region_id, self.region_dir
209 );
210 }
211 }
212 let options = self.options.take().unwrap();
214 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
215 let provider = self.provider::<S>(&options.wal_options)?;
216 let metadata = Arc::new(metadata);
217 let region_manifest_options = Self::manifest_options(
219 config,
220 &options,
221 &self.region_dir,
222 &self.object_store_manager,
223 )?;
224 let manifest_manager = RegionManifestManager::new(
225 metadata.clone(),
226 region_manifest_options,
227 self.stats.total_manifest_size.clone(),
228 self.stats.manifest_version.clone(),
229 )
230 .await?;
231
232 let memtable_builder = self.memtable_builder_provider.builder_for_options(
233 options.memtable.as_ref(),
234 options.need_dedup(),
235 options.merge_mode(),
236 );
237 let part_duration = options.compaction.time_window();
238 let mutable = Arc::new(TimePartitions::new(
240 metadata.clone(),
241 memtable_builder.clone(),
242 0,
243 part_duration,
244 ));
245
246 debug!("Create region {} with options: {:?}", region_id, options);
247
248 let version = VersionBuilder::new(metadata, mutable)
249 .options(options)
250 .build();
251 let version_control = Arc::new(VersionControl::new(version));
252 let access_layer = Arc::new(AccessLayer::new(
253 self.region_dir,
254 object_store,
255 self.puffin_manager_factory,
256 self.intermediate_manager,
257 ));
258 let now = self.time_provider.current_time_millis();
259
260 Ok(MitoRegion {
261 region_id,
262 version_control,
263 access_layer: access_layer.clone(),
264 manifest_ctx: Arc::new(ManifestContext::new(
266 manifest_manager,
267 RegionRoleState::Leader(RegionLeaderState::Writable),
268 )),
269 file_purger: Arc::new(LocalFilePurger::new(
270 self.purge_scheduler,
271 access_layer,
272 self.cache_manager,
273 )),
274 provider,
275 last_flush_millis: AtomicI64::new(now),
276 last_compaction_millis: AtomicI64::new(now),
277 time_provider: self.time_provider.clone(),
278 topic_latest_entry_id: AtomicU64::new(0),
279 memtable_builder,
280 stats: self.stats,
281 })
282 }
283
284 pub(crate) async fn open<S: LogStore>(
288 mut self,
289 config: &MitoConfig,
290 wal: &Wal<S>,
291 ) -> Result<MitoRegion> {
292 let region_id = self.region_id;
293 let region = self
294 .maybe_open(config, wal)
295 .await?
296 .context(EmptyRegionDirSnafu {
297 region_id,
298 region_dir: self.region_dir,
299 })?;
300
301 ensure!(
302 region.region_id == self.region_id,
303 RegionCorruptedSnafu {
304 region_id: self.region_id,
305 reason: format!(
306 "recovered region has different region id {}",
307 region.region_id
308 ),
309 }
310 );
311
312 Ok(region)
313 }
314
315 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
316 match wal_options {
317 WalOptions::RaftEngine => {
318 ensure!(
319 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
320 error::IncompatibleWalProviderChangeSnafu {
321 global: "`kafka`",
322 region: "`raft_engine`",
323 }
324 );
325 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
326 }
327 WalOptions::Kafka(options) => {
328 ensure!(
329 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
330 error::IncompatibleWalProviderChangeSnafu {
331 global: "`raft_engine`",
332 region: "`kafka`",
333 }
334 );
335 Ok(Provider::kafka_provider(options.topic.to_string()))
336 }
337 WalOptions::Noop => Ok(Provider::noop_provider()),
338 }
339 }
340
341 async fn maybe_open<S: LogStore>(
343 &mut self,
344 config: &MitoConfig,
345 wal: &Wal<S>,
346 ) -> Result<Option<MitoRegion>> {
347 let region_options = self.options.as_ref().unwrap().clone();
348
349 let region_manifest_options = Self::manifest_options(
350 config,
351 ®ion_options,
352 &self.region_dir,
353 &self.object_store_manager,
354 )?;
355 let Some(manifest_manager) = RegionManifestManager::open(
356 region_manifest_options,
357 self.stats.total_manifest_size.clone(),
358 self.stats.manifest_version.clone(),
359 )
360 .await?
361 else {
362 return Ok(None);
363 };
364
365 let manifest = manifest_manager.manifest();
366 let metadata = manifest.metadata.clone();
367
368 let region_id = self.region_id;
369 let provider = self.provider::<S>(®ion_options.wal_options)?;
370 let wal_entry_reader = self
371 .wal_entry_reader
372 .take()
373 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
374 let on_region_opened = wal.on_region_opened();
375 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
376
377 debug!("Open region {} with options: {:?}", region_id, self.options);
378
379 let access_layer = Arc::new(AccessLayer::new(
380 self.region_dir.clone(),
381 object_store,
382 self.puffin_manager_factory.clone(),
383 self.intermediate_manager.clone(),
384 ));
385 let file_purger = Arc::new(LocalFilePurger::new(
386 self.purge_scheduler.clone(),
387 access_layer.clone(),
388 self.cache_manager.clone(),
389 ));
390 let memtable_builder = self.memtable_builder_provider.builder_for_options(
391 region_options.memtable.as_ref(),
392 region_options.need_dedup(),
393 region_options.merge_mode(),
394 );
395 let part_duration = region_options
398 .compaction
399 .time_window()
400 .or(manifest.compaction_time_window);
401 let mutable = Arc::new(TimePartitions::new(
403 metadata.clone(),
404 memtable_builder.clone(),
405 0,
406 part_duration,
407 ));
408 let version = VersionBuilder::new(metadata, mutable)
409 .add_files(file_purger.clone(), manifest.files.values().cloned())
410 .flushed_entry_id(manifest.flushed_entry_id)
411 .flushed_sequence(manifest.flushed_sequence)
412 .truncated_entry_id(manifest.truncated_entry_id)
413 .compaction_time_window(manifest.compaction_time_window)
414 .options(region_options)
415 .build();
416 let flushed_entry_id = version.flushed_entry_id;
417 let version_control = Arc::new(VersionControl::new(version));
418 if !self.skip_wal_replay {
419 info!(
420 "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
421 flushed_entry_id + 1,
422 region_id,
423 manifest.manifest_version
424 );
425 replay_memtable(
426 &provider,
427 wal_entry_reader,
428 region_id,
429 flushed_entry_id,
430 &version_control,
431 config.allow_stale_entries,
432 on_region_opened,
433 )
434 .await?;
435 } else {
436 info!(
437 "Skip the WAL replay for region: {}, manifest version: {}",
438 region_id, manifest.manifest_version
439 );
440 }
441 let now = self.time_provider.current_time_millis();
442
443 let region = MitoRegion {
444 region_id: self.region_id,
445 version_control,
446 access_layer,
447 manifest_ctx: Arc::new(ManifestContext::new(
449 manifest_manager,
450 RegionRoleState::Follower,
451 )),
452 file_purger,
453 provider: provider.clone(),
454 last_flush_millis: AtomicI64::new(now),
455 last_compaction_millis: AtomicI64::new(now),
456 time_provider: self.time_provider.clone(),
457 topic_latest_entry_id: AtomicU64::new(0),
458 memtable_builder,
459 stats: self.stats.clone(),
460 };
461 Ok(Some(region))
462 }
463
464 fn manifest_options(
466 config: &MitoConfig,
467 options: &RegionOptions,
468 region_dir: &str,
469 object_store_manager: &ObjectStoreManagerRef,
470 ) -> Result<RegionManifestOptions> {
471 let object_store = get_object_store(&options.storage, object_store_manager)?;
472 Ok(RegionManifestOptions {
473 manifest_dir: new_manifest_dir(region_dir),
474 object_store,
475 compress_type: manifest_compress_type(config.compress_manifest),
478 checkpoint_distance: config.manifest_checkpoint_distance,
479 })
480 }
481}
482
483pub fn get_object_store(
485 name: &Option<String>,
486 object_store_manager: &ObjectStoreManagerRef,
487) -> Result<object_store::ObjectStore> {
488 if let Some(name) = name {
489 Ok(object_store_manager
490 .find(name)
491 .with_context(|| ObjectStoreNotFoundSnafu {
492 object_store: name.to_string(),
493 })?
494 .clone())
495 } else {
496 Ok(object_store_manager.default_object_store().clone())
497 }
498}
499
500pub struct RegionMetadataLoader {
502 config: Arc<MitoConfig>,
503 object_store_manager: ObjectStoreManagerRef,
504}
505
506impl RegionMetadataLoader {
507 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
509 Self {
510 config,
511 object_store_manager,
512 }
513 }
514
515 pub async fn load(
517 &self,
518 region_dir: &str,
519 region_options: &RegionOptions,
520 ) -> Result<Option<RegionMetadataRef>> {
521 let manifest = self.load_manifest(region_dir, region_options).await?;
522 Ok(manifest.map(|m| m.metadata.clone()))
523 }
524
525 pub async fn load_manifest(
527 &self,
528 region_dir: &str,
529 region_options: &RegionOptions,
530 ) -> Result<Option<Arc<RegionManifest>>> {
531 let region_manifest_options = RegionOpener::manifest_options(
532 &self.config,
533 region_options,
534 region_dir,
535 &self.object_store_manager,
536 )?;
537 let Some(manifest_manager) = RegionManifestManager::open(
538 region_manifest_options,
539 Arc::new(AtomicU64::new(0)),
540 Arc::new(AtomicU64::new(0)),
541 )
542 .await?
543 else {
544 return Ok(None);
545 };
546
547 let manifest = manifest_manager.manifest();
548 Ok(Some(manifest))
549 }
550}
551
552pub(crate) fn check_recovered_region(
554 recovered: &RegionMetadata,
555 region_id: RegionId,
556 column_metadatas: &[ColumnMetadata],
557 primary_key: &[ColumnId],
558) -> Result<()> {
559 if recovered.region_id != region_id {
560 error!(
561 "Recovered region {}, expect region {}",
562 recovered.region_id, region_id
563 );
564 return RegionCorruptedSnafu {
565 region_id,
566 reason: format!(
567 "recovered metadata has different region id {}",
568 recovered.region_id
569 ),
570 }
571 .fail();
572 }
573 if recovered.column_metadatas != column_metadatas {
574 error!(
575 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
576 recovered.region_id, recovered.column_metadatas, column_metadatas
577 );
578
579 return RegionCorruptedSnafu {
580 region_id,
581 reason: "recovered metadata has different schema",
582 }
583 .fail();
584 }
585 if recovered.primary_key != primary_key {
586 error!(
587 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
588 recovered.region_id, recovered.primary_key, primary_key
589 );
590
591 return RegionCorruptedSnafu {
592 region_id,
593 reason: "recovered metadata has different primary key",
594 }
595 .fail();
596 }
597
598 Ok(())
599}
600
601pub(crate) async fn replay_memtable<F>(
603 provider: &Provider,
604 mut wal_entry_reader: Box<dyn WalEntryReader>,
605 region_id: RegionId,
606 flushed_entry_id: EntryId,
607 version_control: &VersionControlRef,
608 allow_stale_entries: bool,
609 on_region_opened: F,
610) -> Result<EntryId>
611where
612 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
613{
614 let mut rows_replayed = 0;
615 let mut last_entry_id = flushed_entry_id;
618 let replay_from_entry_id = flushed_entry_id + 1;
619
620 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
621 while let Some(res) = wal_stream.next().await {
622 let (entry_id, entry) = res?;
623 if entry_id <= flushed_entry_id {
624 warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
625 ensure!(
626 allow_stale_entries,
627 StaleLogEntrySnafu {
628 region_id,
629 flushed_entry_id,
630 unexpected_entry_id: entry_id,
631 }
632 );
633 }
634 last_entry_id = last_entry_id.max(entry_id);
635
636 let mut region_write_ctx =
637 RegionWriteCtx::new(region_id, version_control, provider.clone());
638 for mutation in entry.mutations {
639 rows_replayed += mutation
640 .rows
641 .as_ref()
642 .map(|rows| rows.rows.len())
643 .unwrap_or(0);
644 region_write_ctx.push_mutation(
645 mutation.op_type,
646 mutation.rows,
647 mutation.write_hint,
648 OptionOutputTx::none(),
649 );
650 }
651
652 for bulk_entry in entry.bulk_entries {
653 let part = BulkPart::try_from(bulk_entry)?;
654 rows_replayed += part.num_rows();
655 region_write_ctx.push_bulk(OptionOutputTx::none(), part);
656 }
657
658 region_write_ctx.set_next_entry_id(last_entry_id + 1);
660 region_write_ctx.write_memtable().await;
661 region_write_ctx.write_bulk().await;
662 }
663
664 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
667
668 info!(
669 "Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
670 region_id, rows_replayed, last_entry_id
671 );
672 Ok(last_entry_id)
673}
674
675pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
677 join_dir(region_dir, "manifest")
678}