mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod scan_test;
57#[cfg(test)]
58mod set_role_state_test;
59#[cfg(test)]
60mod staging_test;
61#[cfg(test)]
62mod sync_test;
63#[cfg(test)]
64mod truncate_test;
65
66use std::any::Any;
67use std::collections::HashMap;
68use std::sync::Arc;
69use std::time::Instant;
70
71use api::region::RegionResponse;
72use async_trait::async_trait;
73use common_base::Plugins;
74use common_error::ext::BoxedError;
75use common_meta::key::SchemaMetadataManagerRef;
76use common_recordbatch::SendableRecordBatchStream;
77use common_telemetry::{info, tracing};
78use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
79use futures::future::{join_all, try_join_all};
80use futures::stream::{self, Stream, StreamExt};
81use object_store::manager::ObjectStoreManagerRef;
82use snafu::{ensure, OptionExt, ResultExt};
83use store_api::codec::PrimaryKeyEncoding;
84use store_api::logstore::provider::Provider;
85use store_api::logstore::LogStore;
86use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
87use store_api::metric_engine_consts::{
88    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
89};
90use store_api::region_engine::{
91    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
92    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
93};
94use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
95use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
96use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
97use store_api::ManifestVersion;
98use tokio::sync::{oneshot, Semaphore};
99
100use crate::cache::{CacheManagerRef, CacheStrategy};
101use crate::config::MitoConfig;
102use crate::error::{
103    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
104    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
105};
106#[cfg(feature = "enterprise")]
107use crate::extension::BoxedExtensionRangeProviderFactory;
108use crate::manifest::action::RegionEdit;
109use crate::memtable::MemtableStats;
110use crate::metrics::HANDLE_REQUEST_ELAPSED;
111use crate::read::scan_region::{ScanRegion, Scanner};
112use crate::read::stream::ScanBatchStream;
113use crate::region::MitoRegionRef;
114use crate::request::{RegionEditRequest, WorkerRequest};
115use crate::sst::file::FileMeta;
116use crate::sst::file_ref::FileReferenceManagerRef;
117use crate::wal::entry_distributor::{
118    build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
119};
120use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
121use crate::worker::WorkerGroup;
122
123pub const MITO_ENGINE_NAME: &str = "mito";
124
125pub struct MitoEngineBuilder<'a, S: LogStore> {
126    data_home: &'a str,
127    config: MitoConfig,
128    log_store: Arc<S>,
129    object_store_manager: ObjectStoreManagerRef,
130    schema_metadata_manager: SchemaMetadataManagerRef,
131    file_ref_manager: FileReferenceManagerRef,
132    plugins: Plugins,
133    #[cfg(feature = "enterprise")]
134    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
135}
136
137impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
138    pub fn new(
139        data_home: &'a str,
140        config: MitoConfig,
141        log_store: Arc<S>,
142        object_store_manager: ObjectStoreManagerRef,
143        schema_metadata_manager: SchemaMetadataManagerRef,
144        file_ref_manager: FileReferenceManagerRef,
145        plugins: Plugins,
146    ) -> Self {
147        Self {
148            data_home,
149            config,
150            log_store,
151            object_store_manager,
152            schema_metadata_manager,
153            file_ref_manager,
154            plugins,
155            #[cfg(feature = "enterprise")]
156            extension_range_provider_factory: None,
157        }
158    }
159
160    #[cfg(feature = "enterprise")]
161    #[must_use]
162    pub fn with_extension_range_provider_factory(
163        self,
164        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
165    ) -> Self {
166        Self {
167            extension_range_provider_factory,
168            ..self
169        }
170    }
171
172    pub async fn try_build(mut self) -> Result<MitoEngine> {
173        self.config.sanitize(self.data_home)?;
174
175        let config = Arc::new(self.config);
176        let workers = WorkerGroup::start(
177            config.clone(),
178            self.log_store.clone(),
179            self.object_store_manager,
180            self.schema_metadata_manager,
181            self.file_ref_manager,
182            self.plugins,
183        )
184        .await?;
185        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
186        let inner = EngineInner {
187            workers,
188            config,
189            wal_raw_entry_reader,
190            #[cfg(feature = "enterprise")]
191            extension_range_provider_factory: None,
192        };
193
194        #[cfg(feature = "enterprise")]
195        let inner =
196            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
197
198        Ok(MitoEngine {
199            inner: Arc::new(inner),
200        })
201    }
202}
203
204/// Region engine implementation for timeseries data.
205#[derive(Clone)]
206pub struct MitoEngine {
207    inner: Arc<EngineInner>,
208}
209
210impl MitoEngine {
211    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
212    pub async fn new<S: LogStore>(
213        data_home: &str,
214        config: MitoConfig,
215        log_store: Arc<S>,
216        object_store_manager: ObjectStoreManagerRef,
217        schema_metadata_manager: SchemaMetadataManagerRef,
218        file_ref_manager: FileReferenceManagerRef,
219        plugins: Plugins,
220    ) -> Result<MitoEngine> {
221        let builder = MitoEngineBuilder::new(
222            data_home,
223            config,
224            log_store,
225            object_store_manager,
226            schema_metadata_manager,
227            file_ref_manager,
228            plugins,
229        );
230        builder.try_build().await
231    }
232
233    pub fn mito_config(&self) -> &MitoConfig {
234        &self.inner.config
235    }
236
237    pub fn cache_manager(&self) -> CacheManagerRef {
238        self.inner.workers.cache_manager()
239    }
240
241    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
242        self.inner.workers.file_ref_manager()
243    }
244
245    /// Returns true if the specific region exists.
246    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
247        self.inner.workers.is_region_exists(region_id)
248    }
249
250    /// Returns true if the specific region exists.
251    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
252        self.inner.workers.is_region_opening(region_id)
253    }
254
255    /// Returns the region disk/memory statistic.
256    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
257        self.find_region(region_id)
258            .map(|region| region.region_statistic())
259    }
260
261    /// Returns primary key encoding of the region.
262    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
263        self.find_region(region_id)
264            .map(|r| r.primary_key_encoding())
265    }
266
267    /// Handle substrait query and return a stream of record batches
268    ///
269    /// Notice that the output stream's ordering is not guranateed. If order
270    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
271    #[tracing::instrument(skip_all)]
272    pub async fn scan_to_stream(
273        &self,
274        region_id: RegionId,
275        request: ScanRequest,
276    ) -> Result<SendableRecordBatchStream, BoxedError> {
277        self.scanner(region_id, request)
278            .await
279            .map_err(BoxedError::new)?
280            .scan()
281            .await
282    }
283
284    /// Scan [`Batch`]es by [`ScanRequest`].
285    pub async fn scan_batch(
286        &self,
287        region_id: RegionId,
288        request: ScanRequest,
289        filter_deleted: bool,
290    ) -> Result<ScanBatchStream> {
291        let mut scan_region = self.scan_region(region_id, request)?;
292        scan_region.set_filter_deleted(filter_deleted);
293        scan_region.scanner().await?.scan_batch()
294    }
295
296    /// Returns a scanner to scan for `request`.
297    async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
298        self.scan_region(region_id, request)?.scanner().await
299    }
300
301    /// Scans a region.
302    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
303        self.inner.scan_region(region_id, request)
304    }
305
306    /// Edit region's metadata by [RegionEdit] directly. Use with care.
307    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
308    /// Other region editing intention will result in an "invalid request" error.
309    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
310    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
311        let _timer = HANDLE_REQUEST_ELAPSED
312            .with_label_values(&["edit_region"])
313            .start_timer();
314
315        ensure!(
316            is_valid_region_edit(&edit),
317            InvalidRequestSnafu {
318                region_id,
319                reason: "invalid region edit"
320            }
321        );
322
323        let (tx, rx) = oneshot::channel();
324        let request = WorkerRequest::EditRegion(RegionEditRequest {
325            region_id,
326            edit,
327            tx,
328        });
329        self.inner
330            .workers
331            .submit_to_worker(region_id, request)
332            .await?;
333        rx.await.context(RecvSnafu)?
334    }
335
336    #[cfg(test)]
337    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
338        self.find_region(id)
339    }
340
341    pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
342        self.inner.workers.get_region(region_id)
343    }
344
345    fn encode_manifest_info_to_extensions(
346        region_id: &RegionId,
347        manifest_info: RegionManifestInfo,
348        extensions: &mut HashMap<String, Vec<u8>>,
349    ) -> Result<()> {
350        let region_manifest_info = vec![(*region_id, manifest_info)];
351
352        extensions.insert(
353            MANIFEST_INFO_EXTENSION_KEY.to_string(),
354            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
355        );
356        info!(
357            "Added manifest info: {:?} to extensions, region_id: {:?}",
358            region_manifest_info, region_id
359        );
360        Ok(())
361    }
362
363    fn encode_column_metadatas_to_extensions(
364        region_id: &RegionId,
365        column_metadatas: Vec<ColumnMetadata>,
366        extensions: &mut HashMap<String, Vec<u8>>,
367    ) -> Result<()> {
368        extensions.insert(
369            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
370            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
371        );
372        info!(
373            "Added column metadatas: {:?} to extensions, region_id: {:?}",
374            column_metadatas, region_id
375        );
376        Ok(())
377    }
378
379    /// Find the current version's memtables and SSTs stats by region_id.
380    /// The stats must be collected in one place one time to ensure data consistency.
381    pub fn find_memtable_and_sst_stats(
382        &self,
383        region_id: RegionId,
384    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
385        let region = self
386            .find_region(region_id)
387            .context(RegionNotFoundSnafu { region_id })?;
388
389        let version = region.version();
390        let memtable_stats = version
391            .memtables
392            .list_memtables()
393            .iter()
394            .map(|x| x.stats())
395            .collect::<Vec<_>>();
396
397        let sst_stats = version
398            .ssts
399            .levels()
400            .iter()
401            .flat_map(|level| level.files().map(|x| x.meta_ref()))
402            .cloned()
403            .collect::<Vec<_>>();
404        Ok((memtable_stats, sst_stats))
405    }
406
407    /// Lists all SSTs from the manifest of all regions in the engine.
408    pub fn all_ssts_from_manifest(&self) -> impl Iterator<Item = ManifestSstEntry> + use<'_> {
409        self.inner
410            .workers
411            .all_regions()
412            .flat_map(|region| region.manifest_sst_entries())
413    }
414
415    /// Lists all SSTs from the storage layer of all regions in the engine.
416    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
417        let regions = self.inner.workers.all_regions();
418
419        let mut layers_distinct_table_dirs = HashMap::new();
420        for region in regions {
421            let table_dir = region.access_layer.table_dir();
422            if !layers_distinct_table_dirs.contains_key(table_dir) {
423                layers_distinct_table_dirs
424                    .insert(table_dir.to_string(), region.access_layer.clone());
425            }
426        }
427
428        stream::iter(layers_distinct_table_dirs)
429            .map(|(_, access_layer)| access_layer.storage_sst_entries())
430            .flatten()
431    }
432}
433
434/// Check whether the region edit is valid. Only adding files to region is considered valid now.
435fn is_valid_region_edit(edit: &RegionEdit) -> bool {
436    !edit.files_to_add.is_empty()
437        && edit.files_to_remove.is_empty()
438        && matches!(
439            edit,
440            RegionEdit {
441                files_to_add: _,
442                files_to_remove: _,
443                timestamp_ms: _,
444                compaction_time_window: None,
445                flushed_entry_id: None,
446                flushed_sequence: None,
447            }
448        )
449}
450
451/// Inner struct of [MitoEngine].
452struct EngineInner {
453    /// Region workers group.
454    workers: WorkerGroup,
455    /// Config of the engine.
456    config: Arc<MitoConfig>,
457    /// The Wal raw entry reader.
458    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
459    #[cfg(feature = "enterprise")]
460    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
461}
462
463type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
464
465/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
466fn prepare_batch_open_requests(
467    requests: Vec<(RegionId, RegionOpenRequest)>,
468) -> Result<(
469    TopicGroupedRegionOpenRequests,
470    Vec<(RegionId, RegionOpenRequest)>,
471)> {
472    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
473    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
474    for (region_id, request) in requests {
475        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
476            serde_json::from_str(options).context(SerdeJsonSnafu)?
477        } else {
478            WalOptions::RaftEngine
479        };
480        match options {
481            WalOptions::Kafka(options) => {
482                topic_to_regions
483                    .entry(options.topic)
484                    .or_default()
485                    .push((region_id, request));
486            }
487            WalOptions::RaftEngine | WalOptions::Noop => {
488                remaining_regions.push((region_id, request));
489            }
490        }
491    }
492
493    Ok((topic_to_regions, remaining_regions))
494}
495
496impl EngineInner {
497    #[cfg(feature = "enterprise")]
498    #[must_use]
499    fn with_extension_range_provider_factory(
500        self,
501        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
502    ) -> Self {
503        Self {
504            extension_range_provider_factory,
505            ..self
506        }
507    }
508
509    /// Stop the inner engine.
510    async fn stop(&self) -> Result<()> {
511        self.workers.stop().await
512    }
513
514    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
515        self.workers
516            .get_region(region_id)
517            .context(RegionNotFoundSnafu { region_id })
518    }
519
520    /// Get metadata of a region.
521    ///
522    /// Returns error if the region doesn't exist.
523    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
524        // Reading a region doesn't need to go through the region worker thread.
525        let region = self.find_region(region_id)?;
526        Ok(region.metadata())
527    }
528
529    async fn open_topic_regions(
530        &self,
531        topic: String,
532        region_requests: Vec<(RegionId, RegionOpenRequest)>,
533    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
534        let now = Instant::now();
535        let region_ids = region_requests
536            .iter()
537            .map(|(region_id, _)| *region_id)
538            .collect::<Vec<_>>();
539        let provider = Provider::kafka_provider(topic);
540        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
541            provider.clone(),
542            self.wal_raw_entry_reader.clone(),
543            &region_ids,
544            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
545        );
546
547        let mut responses = Vec::with_capacity(region_requests.len());
548        for ((region_id, request), entry_receiver) in
549            region_requests.into_iter().zip(entry_receivers)
550        {
551            let (request, receiver) =
552                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
553            self.workers.submit_to_worker(region_id, request).await?;
554            responses.push(async move { receiver.await.context(RecvSnafu)? });
555        }
556
557        // Waits for entries distribution.
558        let distribution =
559            common_runtime::spawn_global(async move { distributor.distribute().await });
560        // Waits for worker returns.
561        let responses = join_all(responses).await;
562        distribution.await.context(JoinSnafu)??;
563
564        let num_failure = responses.iter().filter(|r| r.is_err()).count();
565        info!(
566            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
567            region_ids.len() - num_failure,
568            // Safety: provider is kafka provider.
569            provider.as_kafka_provider().unwrap(),
570            num_failure,
571            now.elapsed(),
572        );
573        Ok(region_ids.into_iter().zip(responses).collect())
574    }
575
576    async fn handle_batch_open_requests(
577        &self,
578        parallelism: usize,
579        requests: Vec<(RegionId, RegionOpenRequest)>,
580    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
581        let semaphore = Arc::new(Semaphore::new(parallelism));
582        let (topic_to_region_requests, remaining_region_requests) =
583            prepare_batch_open_requests(requests)?;
584        let mut responses =
585            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
586
587        if !topic_to_region_requests.is_empty() {
588            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
589            for (topic, region_requests) in topic_to_region_requests {
590                let semaphore_moved = semaphore.clone();
591                tasks.push(async move {
592                    // Safety: semaphore must exist
593                    let _permit = semaphore_moved.acquire().await.unwrap();
594                    self.open_topic_regions(topic, region_requests).await
595                })
596            }
597            let r = try_join_all(tasks).await?;
598            responses.extend(r.into_iter().flatten());
599        }
600
601        if !remaining_region_requests.is_empty() {
602            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
603            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
604            for (region_id, request) in remaining_region_requests {
605                let semaphore_moved = semaphore.clone();
606                region_ids.push(region_id);
607                tasks.push(async move {
608                    // Safety: semaphore must exist
609                    let _permit = semaphore_moved.acquire().await.unwrap();
610                    let (request, receiver) =
611                        WorkerRequest::new_open_region_request(region_id, request, None);
612
613                    self.workers.submit_to_worker(region_id, request).await?;
614
615                    receiver.await.context(RecvSnafu)?
616                })
617            }
618
619            let results = join_all(tasks).await;
620            responses.extend(region_ids.into_iter().zip(results));
621        }
622
623        Ok(responses)
624    }
625
626    /// Handles [RegionRequest] and return its executed result.
627    async fn handle_request(
628        &self,
629        region_id: RegionId,
630        request: RegionRequest,
631    ) -> Result<AffectedRows> {
632        let region_metadata = self.get_metadata(region_id).ok();
633        let (request, receiver) =
634            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
635        self.workers.submit_to_worker(region_id, request).await?;
636
637        receiver.await.context(RecvSnafu)?
638    }
639
640    fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
641        // Reading a region doesn't need to go through the region worker thread.
642        let region = self.find_region(region_id)?;
643        Ok(Some(region.find_committed_sequence()))
644    }
645
646    /// Handles the scan `request` and returns a [ScanRegion].
647    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
648        let query_start = Instant::now();
649        // Reading a region doesn't need to go through the region worker thread.
650        let region = self.find_region(region_id)?;
651        let version = region.version();
652        // Get cache.
653        let cache_manager = self.workers.cache_manager();
654
655        let scan_region = ScanRegion::new(
656            version,
657            region.access_layer.clone(),
658            request,
659            CacheStrategy::EnableAll(cache_manager),
660        )
661        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
662        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
663        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
664        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
665        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
666        .with_start_time(query_start)
667        // TODO(yingwen): Enable it after flat format is supported.
668        .with_flat_format(false);
669
670        #[cfg(feature = "enterprise")]
671        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
672
673        Ok(scan_region)
674    }
675
676    #[cfg(feature = "enterprise")]
677    fn maybe_fill_extension_range_provider(
678        &self,
679        mut scan_region: ScanRegion,
680        region: MitoRegionRef,
681    ) -> ScanRegion {
682        if region.is_follower()
683            && let Some(factory) = self.extension_range_provider_factory.as_ref()
684        {
685            scan_region
686                .set_extension_range_provider(factory.create_extension_range_provider(region));
687        }
688        scan_region
689    }
690
691    /// Converts the [`RegionRole`].
692    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
693        let region = self.find_region(region_id)?;
694        region.set_role(role);
695        Ok(())
696    }
697
698    /// Sets read-only for a region and ensures no more writes in the region after it returns.
699    async fn set_region_role_state_gracefully(
700        &self,
701        region_id: RegionId,
702        region_role_state: SettableRegionRoleState,
703    ) -> Result<SetRegionRoleStateResponse> {
704        // Notes: It acquires the mutable ownership to ensure no other threads,
705        // Therefore, we submit it to the worker.
706        let (request, receiver) =
707            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
708        self.workers.submit_to_worker(region_id, request).await?;
709
710        receiver.await.context(RecvSnafu)
711    }
712
713    async fn sync_region(
714        &self,
715        region_id: RegionId,
716        manifest_info: RegionManifestInfo,
717    ) -> Result<(ManifestVersion, bool)> {
718        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
719        let manifest_version = manifest_info.data_manifest_version();
720        let (request, receiver) =
721            WorkerRequest::new_sync_region_request(region_id, manifest_version);
722        self.workers.submit_to_worker(region_id, request).await?;
723
724        receiver.await.context(RecvSnafu)?
725    }
726
727    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
728        self.workers.get_region(region_id).map(|region| {
729            if region.is_follower() {
730                RegionRole::Follower
731            } else {
732                RegionRole::Leader
733            }
734        })
735    }
736}
737
738#[async_trait]
739impl RegionEngine for MitoEngine {
740    fn name(&self) -> &str {
741        MITO_ENGINE_NAME
742    }
743
744    #[tracing::instrument(skip_all)]
745    async fn handle_batch_open_requests(
746        &self,
747        parallelism: usize,
748        requests: Vec<(RegionId, RegionOpenRequest)>,
749    ) -> Result<BatchResponses, BoxedError> {
750        // TODO(weny): add metrics.
751        self.inner
752            .handle_batch_open_requests(parallelism, requests)
753            .await
754            .map(|responses| {
755                responses
756                    .into_iter()
757                    .map(|(region_id, response)| {
758                        (
759                            region_id,
760                            response.map(RegionResponse::new).map_err(BoxedError::new),
761                        )
762                    })
763                    .collect::<Vec<_>>()
764            })
765            .map_err(BoxedError::new)
766    }
767
768    #[tracing::instrument(skip_all)]
769    async fn handle_request(
770        &self,
771        region_id: RegionId,
772        request: RegionRequest,
773    ) -> Result<RegionResponse, BoxedError> {
774        let _timer = HANDLE_REQUEST_ELAPSED
775            .with_label_values(&[request.request_type()])
776            .start_timer();
777
778        let is_alter = matches!(request, RegionRequest::Alter(_));
779        let is_create = matches!(request, RegionRequest::Create(_));
780        let mut response = self
781            .inner
782            .handle_request(region_id, request)
783            .await
784            .map(RegionResponse::new)
785            .map_err(BoxedError::new)?;
786
787        if is_alter {
788            self.handle_alter_response(region_id, &mut response)
789                .map_err(BoxedError::new)?;
790        } else if is_create {
791            self.handle_create_response(region_id, &mut response)
792                .map_err(BoxedError::new)?;
793        }
794
795        Ok(response)
796    }
797
798    #[tracing::instrument(skip_all)]
799    async fn handle_query(
800        &self,
801        region_id: RegionId,
802        request: ScanRequest,
803    ) -> Result<RegionScannerRef, BoxedError> {
804        self.scan_region(region_id, request)
805            .map_err(BoxedError::new)?
806            .region_scanner()
807            .await
808            .map_err(BoxedError::new)
809    }
810
811    async fn get_last_seq_num(
812        &self,
813        region_id: RegionId,
814    ) -> Result<Option<SequenceNumber>, BoxedError> {
815        self.inner
816            .get_last_seq_num(region_id)
817            .map_err(BoxedError::new)
818    }
819
820    /// Retrieve region's metadata.
821    async fn get_metadata(
822        &self,
823        region_id: RegionId,
824    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
825        self.inner.get_metadata(region_id).map_err(BoxedError::new)
826    }
827
828    /// Stop the engine.
829    ///
830    /// Stopping the engine doesn't stop the underlying log store as other components might
831    /// still use it. (When no other components are referencing the log store, it will
832    /// automatically shutdown.)
833    async fn stop(&self) -> std::result::Result<(), BoxedError> {
834        self.inner.stop().await.map_err(BoxedError::new)
835    }
836
837    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
838        self.get_region_statistic(region_id)
839    }
840
841    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
842        self.inner
843            .set_region_role(region_id, role)
844            .map_err(BoxedError::new)
845    }
846
847    async fn set_region_role_state_gracefully(
848        &self,
849        region_id: RegionId,
850        region_role_state: SettableRegionRoleState,
851    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
852        let _timer = HANDLE_REQUEST_ELAPSED
853            .with_label_values(&["set_region_role_state_gracefully"])
854            .start_timer();
855
856        self.inner
857            .set_region_role_state_gracefully(region_id, region_role_state)
858            .await
859            .map_err(BoxedError::new)
860    }
861
862    async fn sync_region(
863        &self,
864        region_id: RegionId,
865        manifest_info: RegionManifestInfo,
866    ) -> Result<SyncManifestResponse, BoxedError> {
867        let (_, synced) = self
868            .inner
869            .sync_region(region_id, manifest_info)
870            .await
871            .map_err(BoxedError::new)?;
872
873        Ok(SyncManifestResponse::Mito { synced })
874    }
875
876    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
877        self.inner.role(region_id)
878    }
879
880    fn as_any(&self) -> &dyn Any {
881        self
882    }
883}
884
885impl MitoEngine {
886    fn handle_alter_response(
887        &self,
888        region_id: RegionId,
889        response: &mut RegionResponse,
890    ) -> Result<()> {
891        if let Some(statistic) = self.region_statistic(region_id) {
892            Self::encode_manifest_info_to_extensions(
893                &region_id,
894                statistic.manifest,
895                &mut response.extensions,
896            )?;
897        }
898        let column_metadatas = self
899            .inner
900            .find_region(region_id)
901            .ok()
902            .map(|r| r.metadata().column_metadatas.clone());
903        if let Some(column_metadatas) = column_metadatas {
904            Self::encode_column_metadatas_to_extensions(
905                &region_id,
906                column_metadatas,
907                &mut response.extensions,
908            )?;
909        }
910        Ok(())
911    }
912
913    fn handle_create_response(
914        &self,
915        region_id: RegionId,
916        response: &mut RegionResponse,
917    ) -> Result<()> {
918        let column_metadatas = self
919            .inner
920            .find_region(region_id)
921            .ok()
922            .map(|r| r.metadata().column_metadatas.clone());
923        if let Some(column_metadatas) = column_metadatas {
924            Self::encode_column_metadatas_to_extensions(
925                &region_id,
926                column_metadatas,
927                &mut response.extensions,
928            )?;
929        }
930        Ok(())
931    }
932}
933
934// Tests methods.
935#[cfg(any(test, feature = "test"))]
936#[allow(clippy::too_many_arguments)]
937impl MitoEngine {
938    /// Returns a new [MitoEngine] for tests.
939    pub async fn new_for_test<S: LogStore>(
940        data_home: &str,
941        mut config: MitoConfig,
942        log_store: Arc<S>,
943        object_store_manager: ObjectStoreManagerRef,
944        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
945        listener: Option<crate::engine::listener::EventListenerRef>,
946        time_provider: crate::time_provider::TimeProviderRef,
947        schema_metadata_manager: SchemaMetadataManagerRef,
948        file_ref_manager: FileReferenceManagerRef,
949    ) -> Result<MitoEngine> {
950        config.sanitize(data_home)?;
951
952        let config = Arc::new(config);
953        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
954        Ok(MitoEngine {
955            inner: Arc::new(EngineInner {
956                workers: WorkerGroup::start_for_test(
957                    config.clone(),
958                    log_store,
959                    object_store_manager,
960                    write_buffer_manager,
961                    listener,
962                    schema_metadata_manager,
963                    file_ref_manager,
964                    time_provider,
965                )
966                .await?,
967                config,
968                wal_raw_entry_reader,
969                #[cfg(feature = "enterprise")]
970                extension_range_provider_factory: None,
971            }),
972        })
973    }
974
975    /// Returns the purge scheduler.
976    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
977        self.inner.workers.purge_scheduler()
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use std::time::Duration;
984
985    use super::*;
986    use crate::sst::file::FileMeta;
987
988    #[test]
989    fn test_is_valid_region_edit() {
990        // Valid: has only "files_to_add"
991        let edit = RegionEdit {
992            files_to_add: vec![FileMeta::default()],
993            files_to_remove: vec![],
994            timestamp_ms: None,
995            compaction_time_window: None,
996            flushed_entry_id: None,
997            flushed_sequence: None,
998        };
999        assert!(is_valid_region_edit(&edit));
1000
1001        // Invalid: "files_to_add" is empty
1002        let edit = RegionEdit {
1003            files_to_add: vec![],
1004            files_to_remove: vec![],
1005            timestamp_ms: None,
1006            compaction_time_window: None,
1007            flushed_entry_id: None,
1008            flushed_sequence: None,
1009        };
1010        assert!(!is_valid_region_edit(&edit));
1011
1012        // Invalid: "files_to_remove" is not empty
1013        let edit = RegionEdit {
1014            files_to_add: vec![FileMeta::default()],
1015            files_to_remove: vec![FileMeta::default()],
1016            timestamp_ms: None,
1017            compaction_time_window: None,
1018            flushed_entry_id: None,
1019            flushed_sequence: None,
1020        };
1021        assert!(!is_valid_region_edit(&edit));
1022
1023        // Invalid: other fields are not all "None"s
1024        let edit = RegionEdit {
1025            files_to_add: vec![FileMeta::default()],
1026            files_to_remove: vec![],
1027            timestamp_ms: None,
1028            compaction_time_window: Some(Duration::from_secs(1)),
1029            flushed_entry_id: None,
1030            flushed_sequence: None,
1031        };
1032        assert!(!is_valid_region_edit(&edit));
1033        let edit = RegionEdit {
1034            files_to_add: vec![FileMeta::default()],
1035            files_to_remove: vec![],
1036            timestamp_ms: None,
1037            compaction_time_window: None,
1038            flushed_entry_id: Some(1),
1039            flushed_sequence: None,
1040        };
1041        assert!(!is_valid_region_edit(&edit));
1042        let edit = RegionEdit {
1043            files_to_add: vec![FileMeta::default()],
1044            files_to_remove: vec![],
1045            timestamp_ms: None,
1046            compaction_time_window: None,
1047            flushed_entry_id: None,
1048            flushed_sequence: Some(1),
1049        };
1050        assert!(!is_valid_region_edit(&edit));
1051    }
1052}