mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, warn};
23use common_wal::options::WalOptions;
24use futures::future::BoxFuture;
25use futures::StreamExt;
26use log_store::kafka::log_store::KafkaLogStore;
27use log_store::raft_engine::log_store::RaftEngineLogStore;
28use object_store::manager::ObjectStoreManagerRef;
29use object_store::util::{join_dir, normalize_dir};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::logstore::provider::Provider;
32use store_api::logstore::LogStore;
33use store_api::metadata::{
34    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
35};
36use store_api::region_engine::RegionRole;
37use store_api::storage::{ColumnId, RegionId};
38
39use crate::access_layer::AccessLayer;
40use crate::cache::CacheManagerRef;
41use crate::config::MitoConfig;
42use crate::error;
43use crate::error::{
44    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
45    Result, StaleLogEntrySnafu,
46};
47use crate::manifest::action::RegionManifest;
48use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
49use crate::manifest::storage::manifest_compress_type;
50use crate::memtable::time_partition::TimePartitions;
51use crate::memtable::MemtableBuilderProvider;
52use crate::region::options::RegionOptions;
53use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
54use crate::region::{
55    ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
56};
57use crate::region_write_ctx::RegionWriteCtx;
58use crate::request::OptionOutputTx;
59use crate::schedule::scheduler::SchedulerRef;
60use crate::sst::file_purger::LocalFilePurger;
61use crate::sst::index::intermediate::IntermediateManager;
62use crate::sst::index::puffin_manager::PuffinManagerFactory;
63use crate::time_provider::TimeProviderRef;
64use crate::wal::entry_reader::WalEntryReader;
65use crate::wal::{EntryId, Wal};
66
67/// Builder to create a new [MitoRegion] or open an existing one.
68pub(crate) struct RegionOpener {
69    region_id: RegionId,
70    metadata_builder: Option<RegionMetadataBuilder>,
71    memtable_builder_provider: MemtableBuilderProvider,
72    object_store_manager: ObjectStoreManagerRef,
73    region_dir: String,
74    purge_scheduler: SchedulerRef,
75    options: Option<RegionOptions>,
76    cache_manager: Option<CacheManagerRef>,
77    skip_wal_replay: bool,
78    puffin_manager_factory: PuffinManagerFactory,
79    intermediate_manager: IntermediateManager,
80    time_provider: TimeProviderRef,
81    stats: ManifestStats,
82    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
83}
84
85impl RegionOpener {
86    /// Returns a new opener.
87    // TODO(LFC): Reduce the number of arguments.
88    #[allow(clippy::too_many_arguments)]
89    pub(crate) fn new(
90        region_id: RegionId,
91        region_dir: &str,
92        memtable_builder_provider: MemtableBuilderProvider,
93        object_store_manager: ObjectStoreManagerRef,
94        purge_scheduler: SchedulerRef,
95        puffin_manager_factory: PuffinManagerFactory,
96        intermediate_manager: IntermediateManager,
97        time_provider: TimeProviderRef,
98    ) -> RegionOpener {
99        RegionOpener {
100            region_id,
101            metadata_builder: None,
102            memtable_builder_provider,
103            object_store_manager,
104            region_dir: normalize_dir(region_dir),
105            purge_scheduler,
106            options: None,
107            cache_manager: None,
108            skip_wal_replay: false,
109            puffin_manager_factory,
110            intermediate_manager,
111            time_provider,
112            stats: Default::default(),
113            wal_entry_reader: None,
114        }
115    }
116
117    /// Sets metadata builder of the region to create.
118    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
119        self.metadata_builder = Some(builder);
120        self
121    }
122
123    /// Builds the region metadata.
124    ///
125    /// # Panics
126    /// - Panics if `options` is not set.
127    /// - Panics if `metadata_builder` is not set.
128    fn build_metadata(&mut self) -> Result<RegionMetadata> {
129        let options = self.options.as_ref().unwrap();
130        let mut metadata_builder = self.metadata_builder.take().unwrap();
131        metadata_builder.primary_key_encoding(options.primary_key_encoding());
132        metadata_builder.build().context(InvalidMetadataSnafu)
133    }
134
135    /// Parses and sets options for the region.
136    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
137        self.options(RegionOptions::try_from(&options)?)
138    }
139
140    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
141    /// constructing a new one from scratch.
142    pub(crate) fn wal_entry_reader(
143        mut self,
144        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
145    ) -> Self {
146        self.wal_entry_reader = wal_entry_reader;
147        self
148    }
149
150    /// Sets options for the region.
151    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
152        options.validate()?;
153        self.options = Some(options);
154        Ok(self)
155    }
156
157    /// Sets the cache manager for the region.
158    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
159        self.cache_manager = cache_manager;
160        self
161    }
162
163    /// Sets the `skip_wal_replay`.
164    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
165        self.skip_wal_replay = skip;
166        self
167    }
168
169    /// Writes region manifest and creates a new region if it does not exist.
170    /// Opens the region if it already exists.
171    ///
172    /// # Panics
173    /// - Panics if `metadata_builder` is not set.
174    /// - Panics if `options` is not set.
175    pub(crate) async fn create_or_open<S: LogStore>(
176        mut self,
177        config: &MitoConfig,
178        wal: &Wal<S>,
179    ) -> Result<MitoRegion> {
180        let region_id = self.region_id;
181        let metadata = self.build_metadata()?;
182        // Tries to open the region.
183        match self.maybe_open(config, wal).await {
184            Ok(Some(region)) => {
185                let recovered = region.metadata();
186                // Checks the schema of the region.
187                let expect = &metadata;
188                check_recovered_region(
189                    &recovered,
190                    expect.region_id,
191                    &expect.column_metadatas,
192                    &expect.primary_key,
193                )?;
194                // To keep consistence with Create behavior, set the opened Region to RegionRole::Leader.
195                region.set_role(RegionRole::Leader);
196                return Ok(region);
197            }
198            Ok(None) => {
199                debug!(
200                    "No data under directory {}, region_id: {}",
201                    self.region_dir, self.region_id
202                );
203            }
204            Err(e) => {
205                warn!(e;
206                    "Failed to open region {} before creating it, region_dir: {}",
207                    self.region_id, self.region_dir
208                );
209            }
210        }
211        // Safety: must be set before calling this method.
212        let options = self.options.take().unwrap();
213        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
214        let provider = self.provider::<S>(&options.wal_options)?;
215        let metadata = Arc::new(metadata);
216        // Create a manifest manager for this region and writes regions to the manifest file.
217        let region_manifest_options = Self::manifest_options(
218            config,
219            &options,
220            &self.region_dir,
221            &self.object_store_manager,
222        )?;
223        let manifest_manager = RegionManifestManager::new(
224            metadata.clone(),
225            region_manifest_options,
226            self.stats.total_manifest_size.clone(),
227            self.stats.manifest_version.clone(),
228        )
229        .await?;
230
231        let memtable_builder = self.memtable_builder_provider.builder_for_options(
232            options.memtable.as_ref(),
233            options.need_dedup(),
234            options.merge_mode(),
235        );
236        let part_duration = options.compaction.time_window();
237        // Initial memtable id is 0.
238        let mutable = Arc::new(TimePartitions::new(
239            metadata.clone(),
240            memtable_builder.clone(),
241            0,
242            part_duration,
243        ));
244
245        debug!("Create region {} with options: {:?}", region_id, options);
246
247        let version = VersionBuilder::new(metadata, mutable)
248            .options(options)
249            .build();
250        let version_control = Arc::new(VersionControl::new(version));
251        let access_layer = Arc::new(AccessLayer::new(
252            self.region_dir,
253            object_store,
254            self.puffin_manager_factory,
255            self.intermediate_manager,
256        ));
257        let now = self.time_provider.current_time_millis();
258
259        Ok(MitoRegion {
260            region_id,
261            version_control,
262            access_layer: access_layer.clone(),
263            // Region is writable after it is created.
264            manifest_ctx: Arc::new(ManifestContext::new(
265                manifest_manager,
266                RegionRoleState::Leader(RegionLeaderState::Writable),
267            )),
268            file_purger: Arc::new(LocalFilePurger::new(
269                self.purge_scheduler,
270                access_layer,
271                self.cache_manager,
272            )),
273            provider,
274            last_flush_millis: AtomicI64::new(now),
275            last_compaction_millis: AtomicI64::new(now),
276            time_provider: self.time_provider.clone(),
277            topic_latest_entry_id: AtomicU64::new(0),
278            memtable_builder,
279            stats: self.stats,
280        })
281    }
282
283    /// Opens an existing region in read only mode.
284    ///
285    /// Returns error if the region doesn't exist.
286    pub(crate) async fn open<S: LogStore>(
287        mut self,
288        config: &MitoConfig,
289        wal: &Wal<S>,
290    ) -> Result<MitoRegion> {
291        let region_id = self.region_id;
292        let region = self
293            .maybe_open(config, wal)
294            .await?
295            .context(EmptyRegionDirSnafu {
296                region_id,
297                region_dir: self.region_dir,
298            })?;
299
300        ensure!(
301            region.region_id == self.region_id,
302            RegionCorruptedSnafu {
303                region_id: self.region_id,
304                reason: format!(
305                    "recovered region has different region id {}",
306                    region.region_id
307                ),
308            }
309        );
310
311        Ok(region)
312    }
313
314    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
315        match wal_options {
316            WalOptions::RaftEngine => {
317                ensure!(
318                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
319                    error::IncompatibleWalProviderChangeSnafu {
320                        global: "`kafka`",
321                        region: "`raft_engine`",
322                    }
323                );
324                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
325            }
326            WalOptions::Kafka(options) => {
327                ensure!(
328                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
329                    error::IncompatibleWalProviderChangeSnafu {
330                        global: "`raft_engine`",
331                        region: "`kafka`",
332                    }
333                );
334                Ok(Provider::kafka_provider(options.topic.to_string()))
335            }
336            WalOptions::Noop => Ok(Provider::noop_provider()),
337        }
338    }
339
340    /// Tries to open the region and returns `None` if the region directory is empty.
341    async fn maybe_open<S: LogStore>(
342        &mut self,
343        config: &MitoConfig,
344        wal: &Wal<S>,
345    ) -> Result<Option<MitoRegion>> {
346        let region_options = self.options.as_ref().unwrap().clone();
347
348        let region_manifest_options = Self::manifest_options(
349            config,
350            &region_options,
351            &self.region_dir,
352            &self.object_store_manager,
353        )?;
354        let Some(manifest_manager) = RegionManifestManager::open(
355            region_manifest_options,
356            self.stats.total_manifest_size.clone(),
357            self.stats.manifest_version.clone(),
358        )
359        .await?
360        else {
361            return Ok(None);
362        };
363
364        let manifest = manifest_manager.manifest();
365        let metadata = manifest.metadata.clone();
366
367        let region_id = self.region_id;
368        let provider = self.provider::<S>(&region_options.wal_options)?;
369        let wal_entry_reader = self
370            .wal_entry_reader
371            .take()
372            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
373        let on_region_opened = wal.on_region_opened();
374        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
375
376        debug!("Open region {} with options: {:?}", region_id, self.options);
377
378        let access_layer = Arc::new(AccessLayer::new(
379            self.region_dir.clone(),
380            object_store,
381            self.puffin_manager_factory.clone(),
382            self.intermediate_manager.clone(),
383        ));
384        let file_purger = Arc::new(LocalFilePurger::new(
385            self.purge_scheduler.clone(),
386            access_layer.clone(),
387            self.cache_manager.clone(),
388        ));
389        let memtable_builder = self.memtable_builder_provider.builder_for_options(
390            region_options.memtable.as_ref(),
391            region_options.need_dedup(),
392            region_options.merge_mode(),
393        );
394        // Use compaction time window in the manifest if region doesn't provide
395        // the time window option.
396        let part_duration = region_options
397            .compaction
398            .time_window()
399            .or(manifest.compaction_time_window);
400        // Initial memtable id is 0.
401        let mutable = Arc::new(TimePartitions::new(
402            metadata.clone(),
403            memtable_builder.clone(),
404            0,
405            part_duration,
406        ));
407        let version = VersionBuilder::new(metadata, mutable)
408            .add_files(file_purger.clone(), manifest.files.values().cloned())
409            .flushed_entry_id(manifest.flushed_entry_id)
410            .flushed_sequence(manifest.flushed_sequence)
411            .truncated_entry_id(manifest.truncated_entry_id)
412            .compaction_time_window(manifest.compaction_time_window)
413            .options(region_options)
414            .build();
415        let flushed_entry_id = version.flushed_entry_id;
416        let version_control = Arc::new(VersionControl::new(version));
417        if !self.skip_wal_replay {
418            info!(
419                "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
420                flushed_entry_id + 1,
421                region_id,
422                manifest.manifest_version
423            );
424            replay_memtable(
425                &provider,
426                wal_entry_reader,
427                region_id,
428                flushed_entry_id,
429                &version_control,
430                config.allow_stale_entries,
431                on_region_opened,
432            )
433            .await?;
434        } else {
435            info!(
436                "Skip the WAL replay for region: {}, manifest version: {}",
437                region_id, manifest.manifest_version
438            );
439        }
440        let now = self.time_provider.current_time_millis();
441
442        let region = MitoRegion {
443            region_id: self.region_id,
444            version_control,
445            access_layer,
446            // Region is always opened in read only mode.
447            manifest_ctx: Arc::new(ManifestContext::new(
448                manifest_manager,
449                RegionRoleState::Follower,
450            )),
451            file_purger,
452            provider: provider.clone(),
453            last_flush_millis: AtomicI64::new(now),
454            last_compaction_millis: AtomicI64::new(now),
455            time_provider: self.time_provider.clone(),
456            topic_latest_entry_id: AtomicU64::new(0),
457            memtable_builder,
458            stats: self.stats.clone(),
459        };
460        Ok(Some(region))
461    }
462
463    /// Returns a new manifest options.
464    fn manifest_options(
465        config: &MitoConfig,
466        options: &RegionOptions,
467        region_dir: &str,
468        object_store_manager: &ObjectStoreManagerRef,
469    ) -> Result<RegionManifestOptions> {
470        let object_store = get_object_store(&options.storage, object_store_manager)?;
471        Ok(RegionManifestOptions {
472            manifest_dir: new_manifest_dir(region_dir),
473            object_store,
474            // We don't allow users to set the compression algorithm as we use it as a file suffix.
475            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
476            compress_type: manifest_compress_type(config.compress_manifest),
477            checkpoint_distance: config.manifest_checkpoint_distance,
478        })
479    }
480}
481
482/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
483pub fn get_object_store(
484    name: &Option<String>,
485    object_store_manager: &ObjectStoreManagerRef,
486) -> Result<object_store::ObjectStore> {
487    if let Some(name) = name {
488        Ok(object_store_manager
489            .find(name)
490            .with_context(|| ObjectStoreNotFoundSnafu {
491                object_store: name.to_string(),
492            })?
493            .clone())
494    } else {
495        Ok(object_store_manager.default_object_store().clone())
496    }
497}
498
499/// A loader for loading metadata from a region dir.
500pub struct RegionMetadataLoader {
501    config: Arc<MitoConfig>,
502    object_store_manager: ObjectStoreManagerRef,
503}
504
505impl RegionMetadataLoader {
506    /// Creates a new `RegionOpenerBuilder`.
507    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
508        Self {
509            config,
510            object_store_manager,
511        }
512    }
513
514    /// Loads the metadata of the region from the region dir.
515    pub async fn load(
516        &self,
517        region_dir: &str,
518        region_options: &RegionOptions,
519    ) -> Result<Option<RegionMetadataRef>> {
520        let manifest = self.load_manifest(region_dir, region_options).await?;
521        Ok(manifest.map(|m| m.metadata.clone()))
522    }
523
524    /// Loads the manifest of the region from the region dir.
525    pub async fn load_manifest(
526        &self,
527        region_dir: &str,
528        region_options: &RegionOptions,
529    ) -> Result<Option<Arc<RegionManifest>>> {
530        let region_manifest_options = RegionOpener::manifest_options(
531            &self.config,
532            region_options,
533            region_dir,
534            &self.object_store_manager,
535        )?;
536        let Some(manifest_manager) = RegionManifestManager::open(
537            region_manifest_options,
538            Arc::new(AtomicU64::new(0)),
539            Arc::new(AtomicU64::new(0)),
540        )
541        .await?
542        else {
543            return Ok(None);
544        };
545
546        let manifest = manifest_manager.manifest();
547        Ok(Some(manifest))
548    }
549}
550
551/// Checks whether the recovered region has the same schema as region to create.
552pub(crate) fn check_recovered_region(
553    recovered: &RegionMetadata,
554    region_id: RegionId,
555    column_metadatas: &[ColumnMetadata],
556    primary_key: &[ColumnId],
557) -> Result<()> {
558    if recovered.region_id != region_id {
559        error!(
560            "Recovered region {}, expect region {}",
561            recovered.region_id, region_id
562        );
563        return RegionCorruptedSnafu {
564            region_id,
565            reason: format!(
566                "recovered metadata has different region id {}",
567                recovered.region_id
568            ),
569        }
570        .fail();
571    }
572    if recovered.column_metadatas != column_metadatas {
573        error!(
574            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
575            recovered.region_id, recovered.column_metadatas, column_metadatas
576        );
577
578        return RegionCorruptedSnafu {
579            region_id,
580            reason: "recovered metadata has different schema",
581        }
582        .fail();
583    }
584    if recovered.primary_key != primary_key {
585        error!(
586            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
587            recovered.region_id, recovered.primary_key, primary_key
588        );
589
590        return RegionCorruptedSnafu {
591            region_id,
592            reason: "recovered metadata has different primary key",
593        }
594        .fail();
595    }
596
597    Ok(())
598}
599
600/// Replays the mutations from WAL and inserts mutations to memtable of given region.
601pub(crate) async fn replay_memtable<F>(
602    provider: &Provider,
603    mut wal_entry_reader: Box<dyn WalEntryReader>,
604    region_id: RegionId,
605    flushed_entry_id: EntryId,
606    version_control: &VersionControlRef,
607    allow_stale_entries: bool,
608    on_region_opened: F,
609) -> Result<EntryId>
610where
611    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
612{
613    let mut rows_replayed = 0;
614    // Last entry id should start from flushed entry id since there might be no
615    // data in the WAL.
616    let mut last_entry_id = flushed_entry_id;
617    let replay_from_entry_id = flushed_entry_id + 1;
618
619    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
620    while let Some(res) = wal_stream.next().await {
621        let (entry_id, entry) = res?;
622        if entry_id <= flushed_entry_id {
623            warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
624            ensure!(
625                allow_stale_entries,
626                StaleLogEntrySnafu {
627                    region_id,
628                    flushed_entry_id,
629                    unexpected_entry_id: entry_id,
630                }
631            );
632        }
633        last_entry_id = last_entry_id.max(entry_id);
634
635        let mut region_write_ctx =
636            RegionWriteCtx::new(region_id, version_control, provider.clone());
637        for mutation in entry.mutations {
638            rows_replayed += mutation
639                .rows
640                .as_ref()
641                .map(|rows| rows.rows.len())
642                .unwrap_or(0);
643            region_write_ctx.push_mutation(
644                mutation.op_type,
645                mutation.rows,
646                mutation.write_hint,
647                OptionOutputTx::none(),
648            );
649        }
650
651        // set next_entry_id and write to memtable.
652        region_write_ctx.set_next_entry_id(last_entry_id + 1);
653        region_write_ctx.write_memtable().await;
654    }
655
656    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
657    // to avoid reading potentially incomplete entries in the future.
658    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
659
660    info!(
661        "Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
662        region_id, rows_replayed, last_entry_id
663    );
664    Ok(last_entry_id)
665}
666
667/// Returns the directory to the manifest files.
668pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
669    join_dir(region_dir, "manifest")
670}