mito2/region/
opener.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region opener.
16
17use std::any::TypeId;
18use std::collections::HashMap;
19use std::sync::atomic::{AtomicI64, AtomicU64};
20use std::sync::Arc;
21
22use common_telemetry::{debug, error, info, warn};
23use common_wal::options::WalOptions;
24use futures::future::BoxFuture;
25use futures::StreamExt;
26use log_store::kafka::log_store::KafkaLogStore;
27use log_store::raft_engine::log_store::RaftEngineLogStore;
28use object_store::manager::ObjectStoreManagerRef;
29use object_store::util::{join_dir, normalize_dir};
30use snafu::{ensure, OptionExt, ResultExt};
31use store_api::logstore::provider::Provider;
32use store_api::logstore::LogStore;
33use store_api::metadata::{
34    ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
35};
36use store_api::region_engine::RegionRole;
37use store_api::region_request::PathType;
38use store_api::storage::{ColumnId, RegionId};
39
40use crate::access_layer::AccessLayer;
41use crate::cache::CacheManagerRef;
42use crate::config::MitoConfig;
43use crate::error;
44use crate::error::{
45    EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
46    Result, StaleLogEntrySnafu,
47};
48use crate::manifest::action::RegionManifest;
49use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
50use crate::manifest::storage::manifest_compress_type;
51use crate::memtable::bulk::part::BulkPart;
52use crate::memtable::time_partition::TimePartitions;
53use crate::memtable::MemtableBuilderProvider;
54use crate::region::options::RegionOptions;
55use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
56use crate::region::{
57    ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
58};
59use crate::region_write_ctx::RegionWriteCtx;
60use crate::request::OptionOutputTx;
61use crate::schedule::scheduler::SchedulerRef;
62use crate::sst::file_purger::LocalFilePurger;
63use crate::sst::index::intermediate::IntermediateManager;
64use crate::sst::index::puffin_manager::PuffinManagerFactory;
65use crate::sst::location::region_dir_from_table_dir;
66use crate::time_provider::TimeProviderRef;
67use crate::wal::entry_reader::WalEntryReader;
68use crate::wal::{EntryId, Wal};
69
70/// Builder to create a new [MitoRegion] or open an existing one.
71pub(crate) struct RegionOpener {
72    region_id: RegionId,
73    metadata_builder: Option<RegionMetadataBuilder>,
74    memtable_builder_provider: MemtableBuilderProvider,
75    object_store_manager: ObjectStoreManagerRef,
76    table_dir: String,
77    path_type: PathType,
78    purge_scheduler: SchedulerRef,
79    options: Option<RegionOptions>,
80    cache_manager: Option<CacheManagerRef>,
81    skip_wal_replay: bool,
82    puffin_manager_factory: PuffinManagerFactory,
83    intermediate_manager: IntermediateManager,
84    time_provider: TimeProviderRef,
85    stats: ManifestStats,
86    wal_entry_reader: Option<Box<dyn WalEntryReader>>,
87}
88
89impl RegionOpener {
90    /// Returns a new opener.
91    // TODO(LFC): Reduce the number of arguments.
92    #[allow(clippy::too_many_arguments)]
93    pub(crate) fn new(
94        region_id: RegionId,
95        table_dir: &str,
96        path_type: PathType,
97        memtable_builder_provider: MemtableBuilderProvider,
98        object_store_manager: ObjectStoreManagerRef,
99        purge_scheduler: SchedulerRef,
100        puffin_manager_factory: PuffinManagerFactory,
101        intermediate_manager: IntermediateManager,
102        time_provider: TimeProviderRef,
103    ) -> RegionOpener {
104        RegionOpener {
105            region_id,
106            metadata_builder: None,
107            memtable_builder_provider,
108            object_store_manager,
109            table_dir: normalize_dir(table_dir),
110            path_type,
111            purge_scheduler,
112            options: None,
113            cache_manager: None,
114            skip_wal_replay: false,
115            puffin_manager_factory,
116            intermediate_manager,
117            time_provider,
118            stats: Default::default(),
119            wal_entry_reader: None,
120        }
121    }
122
123    /// Sets metadata builder of the region to create.
124    pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
125        self.metadata_builder = Some(builder);
126        self
127    }
128
129    /// Computes the region directory from table_dir and region_id.
130    fn region_dir(&self) -> String {
131        region_dir_from_table_dir(&self.table_dir, self.region_id, self.path_type)
132    }
133
134    /// Builds the region metadata.
135    ///
136    /// # Panics
137    /// - Panics if `options` is not set.
138    /// - Panics if `metadata_builder` is not set.
139    fn build_metadata(&mut self) -> Result<RegionMetadata> {
140        let options = self.options.as_ref().unwrap();
141        let mut metadata_builder = self.metadata_builder.take().unwrap();
142        metadata_builder.primary_key_encoding(options.primary_key_encoding());
143        metadata_builder.build().context(InvalidMetadataSnafu)
144    }
145
146    /// Parses and sets options for the region.
147    pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
148        self.options(RegionOptions::try_from(&options)?)
149    }
150
151    /// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
152    /// constructing a new one from scratch.
153    pub(crate) fn wal_entry_reader(
154        mut self,
155        wal_entry_reader: Option<Box<dyn WalEntryReader>>,
156    ) -> Self {
157        self.wal_entry_reader = wal_entry_reader;
158        self
159    }
160
161    /// Sets options for the region.
162    pub(crate) fn options(mut self, options: RegionOptions) -> Result<Self> {
163        options.validate()?;
164        self.options = Some(options);
165        Ok(self)
166    }
167
168    /// Sets the cache manager for the region.
169    pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
170        self.cache_manager = cache_manager;
171        self
172    }
173
174    /// Sets the `skip_wal_replay`.
175    pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
176        self.skip_wal_replay = skip;
177        self
178    }
179
180    /// Writes region manifest and creates a new region if it does not exist.
181    /// Opens the region if it already exists.
182    ///
183    /// # Panics
184    /// - Panics if `metadata_builder` is not set.
185    /// - Panics if `options` is not set.
186    pub(crate) async fn create_or_open<S: LogStore>(
187        mut self,
188        config: &MitoConfig,
189        wal: &Wal<S>,
190    ) -> Result<MitoRegion> {
191        let region_id = self.region_id;
192        let region_dir = self.region_dir();
193        let metadata = self.build_metadata()?;
194        // Tries to open the region.
195        match self.maybe_open(config, wal).await {
196            Ok(Some(region)) => {
197                let recovered = region.metadata();
198                // Checks the schema of the region.
199                let expect = &metadata;
200                check_recovered_region(
201                    &recovered,
202                    expect.region_id,
203                    &expect.column_metadatas,
204                    &expect.primary_key,
205                )?;
206                // To keep consistence with Create behavior, set the opened Region to RegionRole::Leader.
207                region.set_role(RegionRole::Leader);
208                return Ok(region);
209            }
210            Ok(None) => {
211                debug!(
212                    "No data under directory {}, region_id: {}",
213                    region_dir, self.region_id
214                );
215            }
216            Err(e) => {
217                warn!(e;
218                    "Failed to open region {} before creating it, region_dir: {}",
219                    self.region_id, region_dir
220                );
221            }
222        }
223        // Safety: must be set before calling this method.
224        let options = self.options.take().unwrap();
225        let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
226        let provider = self.provider::<S>(&options.wal_options)?;
227        let metadata = Arc::new(metadata);
228        // Create a manifest manager for this region and writes regions to the manifest file.
229        let region_manifest_options =
230            Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
231        let manifest_manager = RegionManifestManager::new(
232            metadata.clone(),
233            region_manifest_options,
234            self.stats.total_manifest_size.clone(),
235            self.stats.manifest_version.clone(),
236        )
237        .await?;
238
239        let memtable_builder = self.memtable_builder_provider.builder_for_options(
240            options.memtable.as_ref(),
241            options.need_dedup(),
242            options.merge_mode(),
243        );
244        let part_duration = options.compaction.time_window();
245        // Initial memtable id is 0.
246        let mutable = Arc::new(TimePartitions::new(
247            metadata.clone(),
248            memtable_builder.clone(),
249            0,
250            part_duration,
251        ));
252
253        debug!("Create region {} with options: {:?}", region_id, options);
254
255        let version = VersionBuilder::new(metadata, mutable)
256            .options(options)
257            .build();
258        let version_control = Arc::new(VersionControl::new(version));
259        let access_layer = Arc::new(AccessLayer::new(
260            self.table_dir.clone(),
261            self.path_type,
262            object_store,
263            self.puffin_manager_factory,
264            self.intermediate_manager,
265        ));
266        let now = self.time_provider.current_time_millis();
267
268        Ok(MitoRegion {
269            region_id,
270            version_control,
271            access_layer: access_layer.clone(),
272            // Region is writable after it is created.
273            manifest_ctx: Arc::new(ManifestContext::new(
274                manifest_manager,
275                RegionRoleState::Leader(RegionLeaderState::Writable),
276            )),
277            file_purger: Arc::new(LocalFilePurger::new(
278                self.purge_scheduler,
279                access_layer,
280                self.cache_manager,
281            )),
282            provider,
283            last_flush_millis: AtomicI64::new(now),
284            last_compaction_millis: AtomicI64::new(now),
285            time_provider: self.time_provider.clone(),
286            topic_latest_entry_id: AtomicU64::new(0),
287            memtable_builder,
288            stats: self.stats,
289        })
290    }
291
292    /// Opens an existing region in read only mode.
293    ///
294    /// Returns error if the region doesn't exist.
295    pub(crate) async fn open<S: LogStore>(
296        mut self,
297        config: &MitoConfig,
298        wal: &Wal<S>,
299    ) -> Result<MitoRegion> {
300        let region_id = self.region_id;
301        let region_dir = self.region_dir();
302        let region = self
303            .maybe_open(config, wal)
304            .await?
305            .context(EmptyRegionDirSnafu {
306                region_id,
307                region_dir: &region_dir,
308            })?;
309
310        ensure!(
311            region.region_id == self.region_id,
312            RegionCorruptedSnafu {
313                region_id: self.region_id,
314                reason: format!(
315                    "recovered region has different region id {}",
316                    region.region_id
317                ),
318            }
319        );
320
321        Ok(region)
322    }
323
324    fn provider<S: LogStore>(&self, wal_options: &WalOptions) -> Result<Provider> {
325        match wal_options {
326            WalOptions::RaftEngine => {
327                ensure!(
328                    TypeId::of::<RaftEngineLogStore>() == TypeId::of::<S>(),
329                    error::IncompatibleWalProviderChangeSnafu {
330                        global: "`kafka`",
331                        region: "`raft_engine`",
332                    }
333                );
334                Ok(Provider::raft_engine_provider(self.region_id.as_u64()))
335            }
336            WalOptions::Kafka(options) => {
337                ensure!(
338                    TypeId::of::<KafkaLogStore>() == TypeId::of::<S>(),
339                    error::IncompatibleWalProviderChangeSnafu {
340                        global: "`raft_engine`",
341                        region: "`kafka`",
342                    }
343                );
344                Ok(Provider::kafka_provider(options.topic.to_string()))
345            }
346            WalOptions::Noop => Ok(Provider::noop_provider()),
347        }
348    }
349
350    /// Tries to open the region and returns `None` if the region directory is empty.
351    async fn maybe_open<S: LogStore>(
352        &mut self,
353        config: &MitoConfig,
354        wal: &Wal<S>,
355    ) -> Result<Option<MitoRegion>> {
356        let region_options = self.options.as_ref().unwrap().clone();
357
358        let region_manifest_options = Self::manifest_options(
359            config,
360            &region_options,
361            &self.region_dir(),
362            &self.object_store_manager,
363        )?;
364        let Some(manifest_manager) = RegionManifestManager::open(
365            region_manifest_options,
366            self.stats.total_manifest_size.clone(),
367            self.stats.manifest_version.clone(),
368        )
369        .await?
370        else {
371            return Ok(None);
372        };
373
374        let manifest = manifest_manager.manifest();
375        let metadata = manifest.metadata.clone();
376
377        let region_id = self.region_id;
378        let provider = self.provider::<S>(&region_options.wal_options)?;
379        let wal_entry_reader = self
380            .wal_entry_reader
381            .take()
382            .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
383        let on_region_opened = wal.on_region_opened();
384        let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
385
386        debug!(
387            "Open region {} at {} with options: {:?}",
388            region_id, self.table_dir, self.options
389        );
390
391        let access_layer = Arc::new(AccessLayer::new(
392            self.table_dir.clone(),
393            self.path_type,
394            object_store,
395            self.puffin_manager_factory.clone(),
396            self.intermediate_manager.clone(),
397        ));
398        let file_purger = Arc::new(LocalFilePurger::new(
399            self.purge_scheduler.clone(),
400            access_layer.clone(),
401            self.cache_manager.clone(),
402        ));
403        let memtable_builder = self.memtable_builder_provider.builder_for_options(
404            region_options.memtable.as_ref(),
405            region_options.need_dedup(),
406            region_options.merge_mode(),
407        );
408        // Use compaction time window in the manifest if region doesn't provide
409        // the time window option.
410        let part_duration = region_options
411            .compaction
412            .time_window()
413            .or(manifest.compaction_time_window);
414        // Initial memtable id is 0.
415        let mutable = Arc::new(TimePartitions::new(
416            metadata.clone(),
417            memtable_builder.clone(),
418            0,
419            part_duration,
420        ));
421        let version = VersionBuilder::new(metadata, mutable)
422            .add_files(file_purger.clone(), manifest.files.values().cloned())
423            .flushed_entry_id(manifest.flushed_entry_id)
424            .flushed_sequence(manifest.flushed_sequence)
425            .truncated_entry_id(manifest.truncated_entry_id)
426            .compaction_time_window(manifest.compaction_time_window)
427            .options(region_options)
428            .build();
429        let flushed_entry_id = version.flushed_entry_id;
430        let version_control = Arc::new(VersionControl::new(version));
431        if !self.skip_wal_replay {
432            info!(
433                "Start replaying memtable at flushed_entry_id + 1: {} for region {}, manifest version: {}",
434                flushed_entry_id + 1,
435                region_id,
436                manifest.manifest_version
437            );
438            replay_memtable(
439                &provider,
440                wal_entry_reader,
441                region_id,
442                flushed_entry_id,
443                &version_control,
444                config.allow_stale_entries,
445                on_region_opened,
446            )
447            .await?;
448        } else {
449            info!(
450                "Skip the WAL replay for region: {}, manifest version: {}, flushed_entry_id: {}",
451                region_id, manifest.manifest_version, flushed_entry_id
452            );
453        }
454        let now = self.time_provider.current_time_millis();
455
456        let region = MitoRegion {
457            region_id: self.region_id,
458            version_control,
459            access_layer,
460            // Region is always opened in read only mode.
461            manifest_ctx: Arc::new(ManifestContext::new(
462                manifest_manager,
463                RegionRoleState::Follower,
464            )),
465            file_purger,
466            provider: provider.clone(),
467            last_flush_millis: AtomicI64::new(now),
468            last_compaction_millis: AtomicI64::new(now),
469            time_provider: self.time_provider.clone(),
470            topic_latest_entry_id: AtomicU64::new(0),
471            memtable_builder,
472            stats: self.stats.clone(),
473        };
474        Ok(Some(region))
475    }
476
477    /// Returns a new manifest options.
478    fn manifest_options(
479        config: &MitoConfig,
480        options: &RegionOptions,
481        region_dir: &str,
482        object_store_manager: &ObjectStoreManagerRef,
483    ) -> Result<RegionManifestOptions> {
484        let object_store = get_object_store(&options.storage, object_store_manager)?;
485        Ok(RegionManifestOptions {
486            manifest_dir: new_manifest_dir(region_dir),
487            object_store,
488            // We don't allow users to set the compression algorithm as we use it as a file suffix.
489            // Currently, the manifest storage doesn't have good support for changing compression algorithms.
490            compress_type: manifest_compress_type(config.compress_manifest),
491            checkpoint_distance: config.manifest_checkpoint_distance,
492        })
493    }
494}
495
496/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
497pub fn get_object_store(
498    name: &Option<String>,
499    object_store_manager: &ObjectStoreManagerRef,
500) -> Result<object_store::ObjectStore> {
501    if let Some(name) = name {
502        Ok(object_store_manager
503            .find(name)
504            .with_context(|| ObjectStoreNotFoundSnafu {
505                object_store: name.to_string(),
506            })?
507            .clone())
508    } else {
509        Ok(object_store_manager.default_object_store().clone())
510    }
511}
512
513/// A loader for loading metadata from a region dir.
514pub struct RegionMetadataLoader {
515    config: Arc<MitoConfig>,
516    object_store_manager: ObjectStoreManagerRef,
517}
518
519impl RegionMetadataLoader {
520    /// Creates a new `RegionOpenerBuilder`.
521    pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
522        Self {
523            config,
524            object_store_manager,
525        }
526    }
527
528    /// Loads the metadata of the region from the region dir.
529    pub async fn load(
530        &self,
531        region_dir: &str,
532        region_options: &RegionOptions,
533    ) -> Result<Option<RegionMetadataRef>> {
534        let manifest = self.load_manifest(region_dir, region_options).await?;
535        Ok(manifest.map(|m| m.metadata.clone()))
536    }
537
538    /// Loads the manifest of the region from the region dir.
539    pub async fn load_manifest(
540        &self,
541        region_dir: &str,
542        region_options: &RegionOptions,
543    ) -> Result<Option<Arc<RegionManifest>>> {
544        let region_manifest_options = RegionOpener::manifest_options(
545            &self.config,
546            region_options,
547            region_dir,
548            &self.object_store_manager,
549        )?;
550        let Some(manifest_manager) = RegionManifestManager::open(
551            region_manifest_options,
552            Arc::new(AtomicU64::new(0)),
553            Arc::new(AtomicU64::new(0)),
554        )
555        .await?
556        else {
557            return Ok(None);
558        };
559
560        let manifest = manifest_manager.manifest();
561        Ok(Some(manifest))
562    }
563}
564
565/// Checks whether the recovered region has the same schema as region to create.
566pub(crate) fn check_recovered_region(
567    recovered: &RegionMetadata,
568    region_id: RegionId,
569    column_metadatas: &[ColumnMetadata],
570    primary_key: &[ColumnId],
571) -> Result<()> {
572    if recovered.region_id != region_id {
573        error!(
574            "Recovered region {}, expect region {}",
575            recovered.region_id, region_id
576        );
577        return RegionCorruptedSnafu {
578            region_id,
579            reason: format!(
580                "recovered metadata has different region id {}",
581                recovered.region_id
582            ),
583        }
584        .fail();
585    }
586    if recovered.column_metadatas != column_metadatas {
587        error!(
588            "Unexpected schema in recovered region {}, recovered: {:?}, expect: {:?}",
589            recovered.region_id, recovered.column_metadatas, column_metadatas
590        );
591
592        return RegionCorruptedSnafu {
593            region_id,
594            reason: "recovered metadata has different schema",
595        }
596        .fail();
597    }
598    if recovered.primary_key != primary_key {
599        error!(
600            "Unexpected primary key in recovered region {}, recovered: {:?}, expect: {:?}",
601            recovered.region_id, recovered.primary_key, primary_key
602        );
603
604        return RegionCorruptedSnafu {
605            region_id,
606            reason: "recovered metadata has different primary key",
607        }
608        .fail();
609    }
610
611    Ok(())
612}
613
614/// Replays the mutations from WAL and inserts mutations to memtable of given region.
615pub(crate) async fn replay_memtable<F>(
616    provider: &Provider,
617    mut wal_entry_reader: Box<dyn WalEntryReader>,
618    region_id: RegionId,
619    flushed_entry_id: EntryId,
620    version_control: &VersionControlRef,
621    allow_stale_entries: bool,
622    on_region_opened: F,
623) -> Result<EntryId>
624where
625    F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> + Send,
626{
627    let mut rows_replayed = 0;
628    // Last entry id should start from flushed entry id since there might be no
629    // data in the WAL.
630    let mut last_entry_id = flushed_entry_id;
631    let replay_from_entry_id = flushed_entry_id + 1;
632
633    let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?;
634    while let Some(res) = wal_stream.next().await {
635        let (entry_id, entry) = res?;
636        if entry_id <= flushed_entry_id {
637            warn!("Stale WAL entries read during replay, region id: {}, flushed entry id: {}, entry id read: {}", region_id, flushed_entry_id, entry_id);
638            ensure!(
639                allow_stale_entries,
640                StaleLogEntrySnafu {
641                    region_id,
642                    flushed_entry_id,
643                    unexpected_entry_id: entry_id,
644                }
645            );
646        }
647        last_entry_id = last_entry_id.max(entry_id);
648
649        let mut region_write_ctx =
650            RegionWriteCtx::new(region_id, version_control, provider.clone());
651        for mutation in entry.mutations {
652            rows_replayed += mutation
653                .rows
654                .as_ref()
655                .map(|rows| rows.rows.len())
656                .unwrap_or(0);
657            region_write_ctx.push_mutation(
658                mutation.op_type,
659                mutation.rows,
660                mutation.write_hint,
661                OptionOutputTx::none(),
662            );
663        }
664
665        for bulk_entry in entry.bulk_entries {
666            let part = BulkPart::try_from(bulk_entry)?;
667            rows_replayed += part.num_rows();
668            ensure!(
669                region_write_ctx.push_bulk(OptionOutputTx::none(), part),
670                RegionCorruptedSnafu {
671                    region_id,
672                    reason: "unable to replay memtable with bulk entries",
673                }
674            );
675        }
676
677        // set next_entry_id and write to memtable.
678        region_write_ctx.set_next_entry_id(last_entry_id + 1);
679        region_write_ctx.write_memtable().await;
680        region_write_ctx.write_bulk().await;
681    }
682
683    // TODO(weny): We need to update `flushed_entry_id` in the region manifest
684    // to avoid reading potentially incomplete entries in the future.
685    (on_region_opened)(region_id, flushed_entry_id, provider).await?;
686
687    let series_count = version_control.current().series_count();
688    info!(
689        "Replay WAL for region: {}, rows recovered: {}, last entry id: {}, total timeseries replayed: {}",
690        region_id, rows_replayed, last_entry_id, series_count
691    );
692    Ok(last_entry_id)
693}
694
695/// Returns the directory to the manifest files.
696pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
697    join_dir(region_dir, "manifest")
698}