1use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, warn};
23use common_wal::options::WalOptions;
24use futures::future::BoxFuture;
25use futures::StreamExt;
26use log_store::kafka::log_store::KafkaLogStore;
27use log_store::raft_engine::log_store::RaftEngineLogStore;
28use object_store::manager::ObjectStoreManagerRef;
29use object_store::util::{join_dir, normalize_dir};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::logstore::provider::Provider;
32use store_api::logstore::LogStore;
33use store_api::metadata::{
34 ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
35};
36use store_api::region_engine::RegionRole;
37use store_api::region_request::PathType;
38use store_api::storage::{ColumnId, RegionId};
39
40use crate::access_layer::AccessLayer;
41use crate::cache::CacheManagerRef;
42use crate::config::MitoConfig;
43use crate::error;
44use crate::error::{
45 EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
46 Result, StaleLogEntrySnafu,
47};
48use crate::manifest::action::RegionManifest;
49use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
50use crate::manifest::storage::manifest_compress_type;
51use crate::memtable::bulk::part::BulkPart;
52use crate::memtable::time_partition::TimePartitions;
53use crate::memtable::MemtableBuilderProvider;
54use crate::region::options::RegionOptions;
55use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
56use crate::region::{
57 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
58};
59use crate::region_write_ctx::RegionWriteCtx;
60use crate::request::OptionOutputTx;
61use crate::schedule::scheduler::SchedulerRef;
62use crate::sst::file_purger::LocalFilePurger;
63use crate::sst::index::intermediate::IntermediateManager;
64use crate::sst::index::puffin_manager::PuffinManagerFactory;
65use crate::sst::location::region_dir_from_table_dir;
66use crate::time_provider::TimeProviderRef;
67use crate::wal::entry_reader::WalEntryReader;
68use crate::wal::{EntryId, Wal};
69
70pub(crate) struct RegionOpener {
72 region_id: RegionId,
73 metadata_builder: Option<RegionMetadataBuilder>,
74 memtable_builder_provider: MemtableBuilderProvider,
75 object_store_manager: ObjectStoreManagerRef,
76 table_dir: String,
77 path_type: PathType,
78 purge_scheduler: SchedulerRef,
79 options: Option<RegionOptions>,
80 cache_manager: Option<CacheManagerRef>,
81 skip_wal_replay: bool,
82 puffin_manager_factory: PuffinManagerFactory,
83 intermediate_manager: IntermediateManager,
84 time_provider: TimeProviderRef,
85 stats: ManifestStats,
86 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
87}
88
89impl RegionOpener {
90 #[allow(clippy::too_many_arguments)]
93 pub(crate) fn new(
94 region_id: RegionId,
95 table_dir: &str,
96 path_type: PathType,
97 memtable_builder_provider: MemtableBuilderProvider,
98 object_store_manager: ObjectStoreManagerRef,
99 purge_scheduler: SchedulerRef,
100 puffin_manager_factory: PuffinManagerFactory,
101 intermediate_manager: IntermediateManager,
102 time_provider: TimeProviderRef,
103 ) -> RegionOpener {
104 RegionOpener {
105 region_id,
106 metadata_builder: None,
107 memtable_builder_provider,
108 object_store_manager,
109 table_dir: normalize_dir(table_dir),
110 path_type,
111 purge_scheduler,
112 options: None,
113 cache_manager: None,
114 skip_wal_replay: false,
115 puffin_manager_factory,
116 intermediate_manager,
117 time_provider,
118 stats: Default::default(),
119 wal_entry_reader: None,
120 }
121 }
122
123 pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
125 self.metadata_builder = Some(builder);
126 self
127 }
128
129 fn region_dir(&self) -> String {
131 region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
132 }
133
134 fn build_metadata(&mut self) -> Result<RegionMetadata> {
140 let options = self.options.as_ref().unwrap();
141 let mut metadata_builder = self.metadata_builder.take().unwrap();
142 metadata_builder.primary_key_encoding(options.primary_key_encoding());
143 metadata_builder.build().context(InvalidMetadataSnafu)
144 }
145
146 pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
148 self.options(RegionOptions::try_from(&options)?)
149 }
150
151 pub(crate) fn wal_entry_reader(
154 mut self,
155 wal_entry_reader: Option<Box<dyn WalEntryReader>>,
156 ) -> Self {
157 self.wal_entry_reader = wal_entry_reader;
158 self
159 }
160
161 pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
163 options.validate()?;
164 self.options = Some(options);
165 Ok(self)
166 }
167
168 pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
170 self.cache_manager = cache_manager;
171 self
172 }
173
174 pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
176 self.skip_wal_replay = skip;
177 self
178 }
179
180 pub(crate) async fn create_or_open<S: LogStore>(
187 mut self,
188 config: &MitoConfig,
189 wal: &Wal<S>,
190 ) -> Result<MitoRegion> {
191 let region_id = self.region_id;
192 let region_dir = self.region_dir();
193 let metadata = self.build_metadata()?;
194 match self.maybe_open(config, wal).await {
196 Ok(Some(region)) => {
197 let recovered = region.metadata();
198 let expect = &metadata;
200 check_recovered_region(
201 &recovered,
202 expect.region_id,
203 &expect.column_metadatas,
204 &expect.primary_key,
205 )?;
206 region.set_role(RegionRole::Leader);
208 return Ok(region);
209 }
210 Ok(None) => {
211 debug!(
212 "No data under directory {}, region_id: {}",
213 region_dir, self.region_id
214 );
215 }
216 Err(e) => {
217 warn!(e;
218 "Failed to open region {} before creating it, region_dir: {}",
219 self.region_id, region_dir
220 );
221 }
222 }
223 let options = self.options.take().unwrap();
225 let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
226 let provider = self.provider::<S>(&options.wal_options)?;
227 let metadata = Arc::new(metadata);
228 let region_manifest_options =
230 Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
231 let manifest_manager = RegionManifestManager::new(
232 metadata.clone(),
233 region_manifest_options,
234 self.stats.total_manifest_size.clone(),
235 self.stats.manifest_version.clone(),
236 )
237 .await?;
238
239 let memtable_builder = self.memtable_builder_provider.builder_for_options(
240 options.memtable.as_ref(),
241 options.need_dedup(),
242 options.merge_mode(),
243 );
244 let part_duration = options.compaction.time_window();
245 let mutable = Arc::new(TimePartitions::new(
247 metadata.clone(),
248 memtable_builder.clone(),
249 0,
250 part_duration,
251 ));
252
253 debug!("Create region {} with options: {:?}", region_id, options);
254
255 let version = VersionBuilder::new(metadata, mutable)
256 .options(options)
257 .build();
258 let version_control = Arc::new(VersionControl::new(version));
259 let access_layer = Arc::new(AccessLayer::new(
260 self.table_dir.clone(),
261 self.path_type,
262 object_store,
263 self.puffin_manager_factory,
264 self.intermediate_manager,
265 ));
266 let now = self.time_provider.current_time_millis();
267
268 Ok(MitoRegion {
269 region_id,
270 version_control,
271 access_layer: access_layer.clone(),
272 manifest_ctx: Arc::new(ManifestContext::new(
274 manifest_manager,
275 RegionRoleState::Leader(RegionLeaderState::Writable),
276 )),
277 file_purger: Arc::new(LocalFilePurger::new(
278 self.purge_scheduler,
279 access_layer,
280 self.cache_manager,
281 )),
282 provider,
283 last_flush_millis: AtomicI64::new(now),
284 last_compaction_millis: AtomicI64::new(now),
285 time_provider: self.time_provider.clone(),
286 topic_latest_entry_id: AtomicU64::new(0),
287 memtable_builder,
288 stats: self.stats,
289 })
290 }
291
292 pub(crate) async fn open<S: LogStore>(
296 mut self,
297 config: &MitoConfig,
298 wal: &Wal<S>,
299 ) -> Result<MitoRegion> {
300 let region_id = self.region_id;
301 let region_dir = self.region_dir();
302 let region = self
303 .maybe_open(config, wal)
304 .await?
305 .context(EmptyRegionDirSnafu {
306 region_id,
307 region_dir: ®ion_dir,
308 })?;
309
310 ensure!(
311 region.region_id == self.region_id,
312 RegionCorruptedSnafu {
313 region_id: self.region_id,
314 reason: format!(
315 "recovered region has different region id {}",
316 region.region_id
317 ),
318 }
319 );
320
321 Ok(region)
322 }
323
324 fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
325 match wal_options {
326 WalOptions::RaftEngine => {
327 ensure!(
328 TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
329 error::IncompatibleWalProviderChangeSnafu {
330 global: "`kafka`",
331 region: "`raft_engine`",
332 }
333 );
334 Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
335 }
336 WalOptions::Kafka(options) => {
337 ensure!(
338 TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
339 error::IncompatibleWalProviderChangeSnafu {
340 global: "`raft_engine`",
341 region: "`kafka`",
342 }
343 );
344 Ok(Provider::kafka_provider(options.topic.to_string()))
345 }
346 WalOptions::Noop => Ok(Provider::noop_provider()),
347 }
348 }
349
350 async fn maybe_open<S: LogStore>(
352 &mut self,
353 config: &MitoConfig,
354 wal: &Wal<S>,
355 ) -> Result<Option<MitoRegion>> {
356 let region_options = self.options.as_ref().unwrap().clone();
357
358 let region_manifest_options = Self::manifest_options(
359 config,
360 ®ion_options,
361 &self.region_dir(),
362 &self.object_store_manager,
363 )?;
364 let Some(manifest_manager) = RegionManifestManager::open(
365 region_manifest_options,
366 self.stats.total_manifest_size.clone(),
367 self.stats.manifest_version.clone(),
368 )
369 .await?
370 else {
371 return Ok(None);
372 };
373
374 let manifest = manifest_manager.manifest();
375 let metadata = manifest.metadata.clone();
376
377 let region_id = self.region_id;
378 let provider = self.provider::<S>(®ion_options.wal_options)?;
379 let wal_entry_reader = self
380 .wal_entry_reader
381 .take()
382 .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
383 let on_region_opened = wal.on_region_opened();
384 let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?;
385
386 debug!(
387 "Open region {} at {} with options: {:?}",
388 region_id, self.table_dir, self.options
389 );
390
391 let access_layer = Arc::new(AccessLayer::new(
392 self.table_dir.clone(),
393 self.path_type,
394 object_store,
395 self.puffin_manager_factory.clone(),
396 self.intermediate_manager.clone(),
397 ));
398 let file_purger = Arc::new(LocalFilePurger::new(
399 self.purge_scheduler.clone(),
400 access_layer.clone(),
401 self.cache_manager.clone(),
402 ));
403 let memtable_builder = self.memtable_builder_provider.builder_for_options(
404 region_options.memtable.as_ref(),
405 region_options.need_dedup(),
406 region_options.merge_mode(),
407 );
408 let part_duration = region_options
411 .compaction
412 .time_window()
413 .or(manifest.compaction_time_window);
414 let mutable = Arc::new(TimePartitions::new(
416 metadata.clone(),
417 memtable_builder.clone(),
418 0,
419 part_duration,
420 ));
421 let version = VersionBuilder::new(metadata, mutable)
422 .add_files(file_purger.clone(), manifest.files.values().cloned())
423 .flushed_entry_id(manifest.flushed_entry_id)
424 .flushed_sequence(manifest.flushed_sequence)
425 .truncated_entry_id(manifest.truncated_entry_id)
426 .compaction_time_window(manifest.compaction_time_window)
427 .options(region_options)
428 .build();
429 let flushed_entry_id = version.flushed_entry_id;
430 let version_control = Arc::new(VersionControl::new(version));
431 if !self.skip_wal_replay {
432 info!(
433 "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
434 flushed_entry_id + 1,
435 region_id,
436 manifest.manifest_version
437 );
438 replay_memtable(
439 &provider,
440 wal_entry_reader,
441 region_id,
442 flushed_entry_id,
443 &version_control,
444 config.allow_stale_entries,
445 on_region_opened,
446 )
447 .await?;
448 } else {
449 info!(
450 "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
451 region_id, manifest.manifest_version, flushed_entry_id
452 );
453 }
454 let now = self.time_provider.current_time_millis();
455
456 let region = MitoRegion {
457 region_id: self.region_id,
458 version_control,
459 access_layer,
460 manifest_ctx: Arc::new(ManifestContext::new(
462 manifest_manager,
463 RegionRoleState::Follower,
464 )),
465 file_purger,
466 provider: provider.clone(),
467 last_flush_millis: AtomicI64::new(now),
468 last_compaction_millis: AtomicI64::new(now),
469 time_provider: self.time_provider.clone(),
470 topic_latest_entry_id: AtomicU64::new(0),
471 memtable_builder,
472 stats: self.stats.clone(),
473 };
474 Ok(Some(region))
475 }
476
477 fn manifest_options(
479 config: &MitoConfig,
480 options: &RegionOptions,
481 region_dir: &str,
482 object_store_manager: &ObjectStoreManagerRef,
483 ) -> Result<RegionManifestOptions> {
484 let object_store = get_object_store(&options.storage, object_store_manager)?;
485 Ok(RegionManifestOptions {
486 manifest_dir: new_manifest_dir(region_dir),
487 object_store,
488 compress_type: manifest_compress_type(config.compress_manifest),
491 checkpoint_distance: config.manifest_checkpoint_distance,
492 })
493 }
494}
495
496pub fn get_object_store(
498 name: &Option<String>,
499 object_store_manager: &ObjectStoreManagerRef,
500) -> Result<object_store::ObjectStore> {
501 if let Some(name) = name {
502 Ok(object_store_manager
503 .find(name)
504 .with_context(|| ObjectStoreNotFoundSnafu {
505 object_store: name.to_string(),
506 })?
507 .clone())
508 } else {
509 Ok(object_store_manager.default_object_store().clone())
510 }
511}
512
513pub struct RegionMetadataLoader {
515 config: Arc<MitoConfig>,
516 object_store_manager: ObjectStoreManagerRef,
517}
518
519impl RegionMetadataLoader {
520 pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
522 Self {
523 config,
524 object_store_manager,
525 }
526 }
527
528 pub async fn load(
530 &self,
531 region_dir: &str,
532 region_options: &RegionOptions,
533 ) -> Result<Option<RegionMetadataRef>> {
534 let manifest = self.load_manifest(region_dir, region_options).await?;
535 Ok(manifest.map(|m| m.metadata.clone()))
536 }
537
538 pub async fn load_manifest(
540 &self,
541 region_dir: &str,
542 region_options: &RegionOptions,
543 ) -> Result<Option<Arc<RegionManifest>>> {
544 let region_manifest_options = RegionOpener::manifest_options(
545 &self.config,
546 region_options,
547 region_dir,
548 &self.object_store_manager,
549 )?;
550 let Some(manifest_manager) = RegionManifestManager::open(
551 region_manifest_options,
552 Arc::new(AtomicU64::new(0)),
553 Arc::new(AtomicU64::new(0)),
554 )
555 .await?
556 else {
557 return Ok(None);
558 };
559
560 let manifest = manifest_manager.manifest();
561 Ok(Some(manifest))
562 }
563}
564
565pub(crate) fn check_recovered_region(
567 recovered: &RegionMetadata,
568 region_id: RegionId,
569 column_metadatas: &[ColumnMetadata],
570 primary_key: &[ColumnId],
571) -> Result<()> {
572 if recovered.region_id != region_id {
573 error!(
574 "Recovered region {}, expect region {}",
575 recovered.region_id, region_id
576 );
577 return RegionCorruptedSnafu {
578 region_id,
579 reason: format!(
580 "recovered metadata has different region id {}",
581 recovered.region_id
582 ),
583 }
584 .fail();
585 }
586 if recovered.column_metadatas != column_metadatas {
587 error!(
588 "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
589 recovered.region_id, recovered.column_metadatas, column_metadatas
590 );
591
592 return RegionCorruptedSnafu {
593 region_id,
594 reason: "recovered metadata has different schema",
595 }
596 .fail();
597 }
598 if recovered.primary_key != primary_key {
599 error!(
600 "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
601 recovered.region_id, recovered.primary_key, primary_key
602 );
603
604 return RegionCorruptedSnafu {
605 region_id,
606 reason: "recovered metadata has different primary key",
607 }
608 .fail();
609 }
610
611 Ok(())
612}
613
614pub(crate) async fn replay_memtable<F>(
616 provider: &Provider,
617 mut wal_entry_reader: Box<dyn WalEntryReader>,
618 region_id: RegionId,
619 flushed_entry_id: EntryId,
620 version_control: &VersionControlRef,
621 allow_stale_entries: bool,
622 on_region_opened: F,
623) -> Result<EntryId>
624where
625 F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
626{
627 let mut rows_replayed = 0;
628 let mut last_entry_id = flushed_entry_id;
631 let replay_from_entry_id = flushed_entry_id + 1;
632
633 let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
634 while let Some(res) = wal_stream.next().await {
635 let (entry_id, entry) = res?;
636 if entry_id <= flushed_entry_id {
637 warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
638 ensure!(
639 allow_stale_entries,
640 StaleLogEntrySnafu {
641 region_id,
642 flushed_entry_id,
643 unexpected_entry_id: entry_id,
644 }
645 );
646 }
647 last_entry_id = last_entry_id.max(entry_id);
648
649 let mut region_write_ctx =
650 RegionWriteCtx::new(region_id, version_control, provider.clone());
651 for mutation in entry.mutations {
652 rows_replayed += mutation
653 .rows
654 .as_ref()
655 .map(|rows| rows.rows.len())
656 .unwrap_or(0);
657 region_write_ctx.push_mutation(
658 mutation.op_type,
659 mutation.rows,
660 mutation.write_hint,
661 OptionOutputTx::none(),
662 );
663 }
664
665 for bulk_entry in entry.bulk_entries {
666 let part = BulkPart::try_from(bulk_entry)?;
667 rows_replayed += part.num_rows();
668 ensure!(
669 region_write_ctx.push_bulk(OptionOutputTx::none(), part),
670 RegionCorruptedSnafu {
671 region_id,
672 reason: "unable to replay memtable with bulk entries",
673 }
674 );
675 }
676
677 region_write_ctx.set_next_entry_id(last_entry_id + 1);
679 region_write_ctx.write_memtable().await;
680 region_write_ctx.write_bulk().await;
681 }
682
683 (on_region_opened)(region_id, flushed_entry_id, provider).await?;
686
687 let series_count = version_control.current().series_count();
688 info!(
689 "Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}",
690 region_id, rows_replayed, last_entry_id, series_count
691 );
692 Ok(last_entry_id)
693}
694
695pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
697 join_dir(region_dir, "manifest")
698}