mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, warn};
23use common_wal::options::WalOptions;
24use futures::future::BoxFuture;
25use futures::StreamExt;
26use log_store::kafka::log_store::KafkaLogStore;
27use log_store::raft_engine::log_store::RaftEngineLogStore;
28use object_store::manager::ObjectStoreManagerRef;
29use object_store::util::{join_dir, normalize_dir};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::logstore::provider::Provider;
32use store_api::logstore::LogStore;
33use store_api::metadata::{
34    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
35};
36use store_api::region_engine::RegionRole;
37use store_api::storage::{ColumnId, RegionId};
38
39use crate::access_layer::AccessLayer;
40use crate::cache::CacheManagerRef;
41use crate::config::MitoConfig;
42use crate::error;
43use crate::error::{
44    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
45    Result, StaleLogEntrySnafu,
46};
47use crate::manifest::action::RegionManifest;
48use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
49use crate::manifest::storage::manifest_compress_type;
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::time_partition::TimePartitions;
52use crate::memtable::MemtableBuilderProvider;
53use crate::region::options::RegionOptions;
54use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
55use crate::region::{
56    ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
57};
58use crate::region_write_ctx::RegionWriteCtx;
59use crate::request::OptionOutputTx;
60use crate::schedule::scheduler::SchedulerRef;
61use crate::sst::file_purger::LocalFilePurger;
62use crate::sst::index::intermediate::IntermediateManager;
63use crate::sst::index::puffin_manager::PuffinManagerFactory;
64use crate::time_provider::TimeProviderRef;
65use crate::wal::entry_reader::WalEntryReader;
66use crate::wal::{EntryId, Wal};
67
68/// Builder to create a new [MitoRegion] or open an existing one.
69pub(crate) struct RegionOpener {
70    region_id: RegionId,
71    metadata_builder: Option<RegionMetadataBuilder>,
72    memtable_builder_provider: MemtableBuilderProvider,
73    object_store_manager: ObjectStoreManagerRef,
74    region_dir: String,
75    purge_scheduler: SchedulerRef,
76    options: Option<RegionOptions>,
77    cache_manager: Option<CacheManagerRef>,
78    skip_wal_replay: bool,
79    puffin_manager_factory: PuffinManagerFactory,
80    intermediate_manager: IntermediateManager,
81    time_provider: TimeProviderRef,
82    stats: ManifestStats,
83    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
84}
85
86impl RegionOpener {
87    /// Returns a new opener.
88    // TODO(LFC): Reduce the number of arguments.
89    #[allow(clippy::too_many_arguments)]
90    pub(crate) fn new(
91        region_id: RegionId,
92        region_dir: &str,
93        memtable_builder_provider: MemtableBuilderProvider,
94        object_store_manager: ObjectStoreManagerRef,
95        purge_scheduler: SchedulerRef,
96        puffin_manager_factory: PuffinManagerFactory,
97        intermediate_manager: IntermediateManager,
98        time_provider: TimeProviderRef,
99    ) -> RegionOpener {
100        RegionOpener {
101            region_id,
102            metadata_builder: None,
103            memtable_builder_provider,
104            object_store_manager,
105            region_dir: normalize_dir(region_dir),
106            purge_scheduler,
107            options: None,
108            cache_manager: None,
109            skip_wal_replay: false,
110            puffin_manager_factory,
111            intermediate_manager,
112            time_provider,
113            stats: Default::default(),
114            wal_entry_reader: None,
115        }
116    }
117
118    /// Sets metadata builder of the region to create.
119    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
120        self.metadata_builder = Some(builder);
121        self
122    }
123
124    /// Builds the region metadata.
125    ///
126    /// # Panics
127    /// - Panics if `options` is not set.
128    /// - Panics if `metadata_builder` is not set.
129    fn build_metadata(&mut self) -> Result<RegionMetadata> {
130        let options = self.options.as_ref().unwrap();
131        let mut metadata_builder = self.metadata_builder.take().unwrap();
132        metadata_builder.primary_key_encoding(options.primary_key_encoding());
133        metadata_builder.build().context(InvalidMetadataSnafu)
134    }
135
136    /// Parses and sets options for the region.
137    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
138        self.options(RegionOptions::try_from(&options)?)
139    }
140
141    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
142    /// constructing a new one from scratch.
143    pub(crate) fn wal_entry_reader(
144        mut self,
145        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
146    ) -> Self {
147        self.wal_entry_reader = wal_entry_reader;
148        self
149    }
150
151    /// Sets options for the region.
152    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
153        options.validate()?;
154        self.options = Some(options);
155        Ok(self)
156    }
157
158    /// Sets the cache manager for the region.
159    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
160        self.cache_manager = cache_manager;
161        self
162    }
163
164    /// Sets the `skip_wal_replay`.
165    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
166        self.skip_wal_replay = skip;
167        self
168    }
169
170    /// Writes region manifest and creates a new region if it does not exist.
171    /// Opens the region if it already exists.
172    ///
173    /// # Panics
174    /// - Panics if `metadata_builder` is not set.
175    /// - Panics if `options` is not set.
176    pub(crate) async fn create_or_open<S: LogStore>(
177        mut self,
178        config: &MitoConfig,
179        wal: &Wal<S>,
180    ) -> Result<MitoRegion> {
181        let region_id = self.region_id;
182        let metadata = self.build_metadata()?;
183        // Tries to open the region.
184        match self.maybe_open(config, wal).await {
185            Ok(Some(region)) => {
186                let recovered = region.metadata();
187                // Checks the schema of the region.
188                let expect = &metadata;
189                check_recovered_region(
190                    &recovered,
191                    expect.region_id,
192                    &expect.column_metadatas,
193                    &expect.primary_key,
194                )?;
195                // To keep consistence with Create behavior, set the opened Region to RegionRole::Leader.
196                region.set_role(RegionRole::Leader);
197                return Ok(region);
198            }
199            Ok(None) => {
200                debug!(
201                    "No data under directory {}, region_id: {}",
202                    self.region_dir, self.region_id
203                );
204            }
205            Err(e) => {
206                warn!(e;
207                    "Failed to open region {} before creating it, region_dir: {}",
208                    self.region_id, self.region_dir
209                );
210            }
211        }
212        // Safety: must be set before calling this method.
213        let options = self.options.take().unwrap();
214        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
215        let provider = self.provider::<S>(&options.wal_options)?;
216        let metadata = Arc::new(metadata);
217        // Create a manifest manager for this region and writes regions to the manifest file.
218        let region_manifest_options = Self::manifest_options(
219            config,
220            &options,
221            &self.region_dir,
222            &self.object_store_manager,
223        )?;
224        let manifest_manager = RegionManifestManager::new(
225            metadata.clone(),
226            region_manifest_options,
227            self.stats.total_manifest_size.clone(),
228            self.stats.manifest_version.clone(),
229        )
230        .await?;
231
232        let memtable_builder = self.memtable_builder_provider.builder_for_options(
233            options.memtable.as_ref(),
234            options.need_dedup(),
235            options.merge_mode(),
236        );
237        let part_duration = options.compaction.time_window();
238        // Initial memtable id is 0.
239        let mutable = Arc::new(TimePartitions::new(
240            metadata.clone(),
241            memtable_builder.clone(),
242            0,
243            part_duration,
244        ));
245
246        debug!("Create region {} with options: {:?}", region_id, options);
247
248        let version = VersionBuilder::new(metadata, mutable)
249            .options(options)
250            .build();
251        let version_control = Arc::new(VersionControl::new(version));
252        let access_layer = Arc::new(AccessLayer::new(
253            self.region_dir,
254            object_store,
255            self.puffin_manager_factory,
256            self.intermediate_manager,
257        ));
258        let now = self.time_provider.current_time_millis();
259
260        Ok(MitoRegion {
261            region_id,
262            version_control,
263            access_layer: access_layer.clone(),
264            // Region is writable after it is created.
265            manifest_ctx: Arc::new(ManifestContext::new(
266                manifest_manager,
267                RegionRoleState::Leader(RegionLeaderState::Writable),
268            )),
269            file_purger: Arc::new(LocalFilePurger::new(
270                self.purge_scheduler,
271                access_layer,
272                self.cache_manager,
273            )),
274            provider,
275            last_flush_millis: AtomicI64::new(now),
276            last_compaction_millis: AtomicI64::new(now),
277            time_provider: self.time_provider.clone(),
278            topic_latest_entry_id: AtomicU64::new(0),
279            memtable_builder,
280            stats: self.stats,
281        })
282    }
283
284    /// Opens an existing region in read only mode.
285    ///
286    /// Returns error if the region doesn't exist.
287    pub(crate) async fn open<S: LogStore>(
288        mut self,
289        config: &MitoConfig,
290        wal: &Wal<S>,
291    ) -> Result<MitoRegion> {
292        let region_id = self.region_id;
293        let region = self
294            .maybe_open(config, wal)
295            .await?
296            .context(EmptyRegionDirSnafu {
297                region_id,
298                region_dir: self.region_dir,
299            })?;
300
301        ensure!(
302            region.region_id == self.region_id,
303            RegionCorruptedSnafu {
304                region_id: self.region_id,
305                reason: format!(
306                    "recovered region has different region id {}",
307                    region.region_id
308                ),
309            }
310        );
311
312        Ok(region)
313    }
314
315    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
316        match wal_options {
317            WalOptions::RaftEngine => {
318                ensure!(
319                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
320                    error::IncompatibleWalProviderChangeSnafu {
321                        global: "`kafka`",
322                        region: "`raft_engine`",
323                    }
324                );
325                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
326            }
327            WalOptions::Kafka(options) => {
328                ensure!(
329                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
330                    error::IncompatibleWalProviderChangeSnafu {
331                        global: "`raft_engine`",
332                        region: "`kafka`",
333                    }
334                );
335                Ok(Provider::kafka_provider(options.topic.to_string()))
336            }
337            WalOptions::Noop => Ok(Provider::noop_provider()),
338        }
339    }
340
341    /// Tries to open the region and returns `None` if the region directory is empty.
342    async fn maybe_open<S: LogStore>(
343        &mut self,
344        config: &MitoConfig,
345        wal: &Wal<S>,
346    ) -> Result<Option<MitoRegion>> {
347        let region_options = self.options.as_ref().unwrap().clone();
348
349        let region_manifest_options = Self::manifest_options(
350            config,
351            &region_options,
352            &self.region_dir,
353            &self.object_store_manager,
354        )?;
355        let Some(manifest_manager) = RegionManifestManager::open(
356            region_manifest_options,
357            self.stats.total_manifest_size.clone(),
358            self.stats.manifest_version.clone(),
359        )
360        .await?
361        else {
362            return Ok(None);
363        };
364
365        let manifest = manifest_manager.manifest();
366        let metadata = manifest.metadata.clone();
367
368        let region_id = self.region_id;
369        let provider = self.provider::<S>(&region_options.wal_options)?;
370        let wal_entry_reader = self
371            .wal_entry_reader
372            .take()
373            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
374        let on_region_opened = wal.on_region_opened();
375        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
376
377        debug!("Open region {} with options: {:?}", region_id, self.options);
378
379        let access_layer = Arc::new(AccessLayer::new(
380            self.region_dir.clone(),
381            object_store,
382            self.puffin_manager_factory.clone(),
383            self.intermediate_manager.clone(),
384        ));
385        let file_purger = Arc::new(LocalFilePurger::new(
386            self.purge_scheduler.clone(),
387            access_layer.clone(),
388            self.cache_manager.clone(),
389        ));
390        let memtable_builder = self.memtable_builder_provider.builder_for_options(
391            region_options.memtable.as_ref(),
392            region_options.need_dedup(),
393            region_options.merge_mode(),
394        );
395        // Use compaction time window in the manifest if region doesn't provide
396        // the time window option.
397        let part_duration = region_options
398            .compaction
399            .time_window()
400            .or(manifest.compaction_time_window);
401        // Initial memtable id is 0.
402        let mutable = Arc::new(TimePartitions::new(
403            metadata.clone(),
404            memtable_builder.clone(),
405            0,
406            part_duration,
407        ));
408        let version = VersionBuilder::new(metadata, mutable)
409            .add_files(file_purger.clone(), manifest.files.values().cloned())
410            .flushed_entry_id(manifest.flushed_entry_id)
411            .flushed_sequence(manifest.flushed_sequence)
412            .truncated_entry_id(manifest.truncated_entry_id)
413            .compaction_time_window(manifest.compaction_time_window)
414            .options(region_options)
415            .build();
416        let flushed_entry_id = version.flushed_entry_id;
417        let version_control = Arc::new(VersionControl::new(version));
418        if !self.skip_wal_replay {
419            info!(
420                "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
421                flushed_entry_id + 1,
422                region_id,
423                manifest.manifest_version
424            );
425            replay_memtable(
426                &provider,
427                wal_entry_reader,
428                region_id,
429                flushed_entry_id,
430                &version_control,
431                config.allow_stale_entries,
432                on_region_opened,
433            )
434            .await?;
435        } else {
436            info!(
437                "Skip the WAL replay for region: {}, manifest version: {}",
438                region_id, manifest.manifest_version
439            );
440        }
441        let now = self.time_provider.current_time_millis();
442
443        let region = MitoRegion {
444            region_id: self.region_id,
445            version_control,
446            access_layer,
447            // Region is always opened in read only mode.
448            manifest_ctx: Arc::new(ManifestContext::new(
449                manifest_manager,
450                RegionRoleState::Follower,
451            )),
452            file_purger,
453            provider: provider.clone(),
454            last_flush_millis: AtomicI64::new(now),
455            last_compaction_millis: AtomicI64::new(now),
456            time_provider: self.time_provider.clone(),
457            topic_latest_entry_id: AtomicU64::new(0),
458            memtable_builder,
459            stats: self.stats.clone(),
460        };
461        Ok(Some(region))
462    }
463
464    /// Returns a new manifest options.
465    fn manifest_options(
466        config: &MitoConfig,
467        options: &RegionOptions,
468        region_dir: &str,
469        object_store_manager: &ObjectStoreManagerRef,
470    ) -> Result<RegionManifestOptions> {
471        let object_store = get_object_store(&options.storage, object_store_manager)?;
472        Ok(RegionManifestOptions {
473            manifest_dir: new_manifest_dir(region_dir),
474            object_store,
475            // We don't allow users to set the compression algorithm as we use it as a file suffix.
476            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
477            compress_type: manifest_compress_type(config.compress_manifest),
478            checkpoint_distance: config.manifest_checkpoint_distance,
479        })
480    }
481}
482
483/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
484pub fn get_object_store(
485    name: &Option<String>,
486    object_store_manager: &ObjectStoreManagerRef,
487) -> Result<object_store::ObjectStore> {
488    if let Some(name) = name {
489        Ok(object_store_manager
490            .find(name)
491            .with_context(|| ObjectStoreNotFoundSnafu {
492                object_store: name.to_string(),
493            })?
494            .clone())
495    } else {
496        Ok(object_store_manager.default_object_store().clone())
497    }
498}
499
500/// A loader for loading metadata from a region dir.
501pub struct RegionMetadataLoader {
502    config: Arc<MitoConfig>,
503    object_store_manager: ObjectStoreManagerRef,
504}
505
506impl RegionMetadataLoader {
507    /// Creates a new `RegionOpenerBuilder`.
508    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
509        Self {
510            config,
511            object_store_manager,
512        }
513    }
514
515    /// Loads the metadata of the region from the region dir.
516    pub async fn load(
517        &self,
518        region_dir: &str,
519        region_options: &RegionOptions,
520    ) -> Result<Option<RegionMetadataRef>> {
521        let manifest = self.load_manifest(region_dir, region_options).await?;
522        Ok(manifest.map(|m| m.metadata.clone()))
523    }
524
525    /// Loads the manifest of the region from the region dir.
526    pub async fn load_manifest(
527        &self,
528        region_dir: &str,
529        region_options: &RegionOptions,
530    ) -> Result<Option<Arc<RegionManifest>>> {
531        let region_manifest_options = RegionOpener::manifest_options(
532            &self.config,
533            region_options,
534            region_dir,
535            &self.object_store_manager,
536        )?;
537        let Some(manifest_manager) = RegionManifestManager::open(
538            region_manifest_options,
539            Arc::new(AtomicU64::new(0)),
540            Arc::new(AtomicU64::new(0)),
541        )
542        .await?
543        else {
544            return Ok(None);
545        };
546
547        let manifest = manifest_manager.manifest();
548        Ok(Some(manifest))
549    }
550}
551
552/// Checks whether the recovered region has the same schema as region to create.
553pub(crate) fn check_recovered_region(
554    recovered: &RegionMetadata,
555    region_id: RegionId,
556    column_metadatas: &[ColumnMetadata],
557    primary_key: &[ColumnId],
558) -> Result<()> {
559    if recovered.region_id != region_id {
560        error!(
561            "Recovered region {}, expect region {}",
562            recovered.region_id, region_id
563        );
564        return RegionCorruptedSnafu {
565            region_id,
566            reason: format!(
567                "recovered metadata has different region id {}",
568                recovered.region_id
569            ),
570        }
571        .fail();
572    }
573    if recovered.column_metadatas != column_metadatas {
574        error!(
575            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
576            recovered.region_id, recovered.column_metadatas, column_metadatas
577        );
578
579        return RegionCorruptedSnafu {
580            region_id,
581            reason: "recovered metadata has different schema",
582        }
583        .fail();
584    }
585    if recovered.primary_key != primary_key {
586        error!(
587            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
588            recovered.region_id, recovered.primary_key, primary_key
589        );
590
591        return RegionCorruptedSnafu {
592            region_id,
593            reason: "recovered metadata has different primary key",
594        }
595        .fail();
596    }
597
598    Ok(())
599}
600
601/// Replays the mutations from WAL and inserts mutations to memtable of given region.
602pub(crate) async fn replay_memtable<F>(
603    provider: &Provider,
604    mut wal_entry_reader: Box<dyn WalEntryReader>,
605    region_id: RegionId,
606    flushed_entry_id: EntryId,
607    version_control: &VersionControlRef,
608    allow_stale_entries: bool,
609    on_region_opened: F,
610) -> Result<EntryId>
611where
612    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
613{
614    let mut rows_replayed = 0;
615    // Last entry id should start from flushed entry id since there might be no
616    // data in the WAL.
617    let mut last_entry_id = flushed_entry_id;
618    let replay_from_entry_id = flushed_entry_id + 1;
619
620    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
621    while let Some(res) = wal_stream.next().await {
622        let (entry_id, entry) = res?;
623        if entry_id <= flushed_entry_id {
624            warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
625            ensure!(
626                allow_stale_entries,
627                StaleLogEntrySnafu {
628                    region_id,
629                    flushed_entry_id,
630                    unexpected_entry_id: entry_id,
631                }
632            );
633        }
634        last_entry_id = last_entry_id.max(entry_id);
635
636        let mut region_write_ctx =
637            RegionWriteCtx::new(region_id, version_control, provider.clone());
638        for mutation in entry.mutations {
639            rows_replayed += mutation
640                .rows
641                .as_ref()
642                .map(|rows| rows.rows.len())
643                .unwrap_or(0);
644            region_write_ctx.push_mutation(
645                mutation.op_type,
646                mutation.rows,
647                mutation.write_hint,
648                OptionOutputTx::none(),
649            );
650        }
651
652        for bulk_entry in entry.bulk_entries {
653            let part = BulkPart::try_from(bulk_entry)?;
654            rows_replayed += part.num_rows();
655            region_write_ctx.push_bulk(OptionOutputTx::none(), part);
656        }
657
658        // set next_entry_id and write to memtable.
659        region_write_ctx.set_next_entry_id(last_entry_id + 1);
660        region_write_ctx.write_memtable().await;
661        region_write_ctx.write_bulk().await;
662    }
663
664    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
665    // to avoid reading potentially incomplete entries in the future.
666    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
667
668    info!(
669        "Replay WAL for region: {}, rows recovered: {}, last entry id: {}",
670        region_id, rows_replayed, last_entry_id
671    );
672    Ok(last_entry_id)
673}
674
675/// Returns the directory to the manifest files.
676pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
677    join_dir(region_dir, "manifest")
678}