mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod bump_committed_sequence_test;
27#[cfg(test)]
28mod catchup_test;
29#[cfg(test)]
30mod close_test;
31#[cfg(test)]
32mod compaction_test;
33#[cfg(test)]
34mod create_test;
35#[cfg(test)]
36mod drop_test;
37#[cfg(test)]
38mod edit_region_test;
39#[cfg(test)]
40mod filter_deleted_test;
41#[cfg(test)]
42mod flush_test;
43#[cfg(any(test, feature = "test"))]
44pub mod listener;
45#[cfg(test)]
46mod merge_mode_test;
47#[cfg(test)]
48mod open_test;
49#[cfg(test)]
50mod parallel_test;
51#[cfg(test)]
52mod projection_test;
53#[cfg(test)]
54mod prune_test;
55#[cfg(test)]
56mod row_selector_test;
57#[cfg(test)]
58mod scan_corrupt;
59#[cfg(test)]
60mod scan_test;
61#[cfg(test)]
62mod set_role_state_test;
63#[cfg(test)]
64mod staging_test;
65#[cfg(test)]
66mod sync_test;
67#[cfg(test)]
68mod truncate_test;
69
70use std::any::Any;
71use std::collections::HashMap;
72use std::sync::Arc;
73use std::time::Instant;
74
75use api::region::RegionResponse;
76use async_trait::async_trait;
77use common_base::Plugins;
78use common_error::ext::BoxedError;
79use common_meta::key::SchemaMetadataManagerRef;
80use common_recordbatch::SendableRecordBatchStream;
81use common_telemetry::{info, tracing};
82use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
83use futures::future::{join_all, try_join_all};
84use futures::stream::{self, Stream, StreamExt};
85use object_store::manager::ObjectStoreManagerRef;
86use snafu::{OptionExt, ResultExt, ensure};
87use store_api::ManifestVersion;
88use store_api::codec::PrimaryKeyEncoding;
89use store_api::logstore::LogStore;
90use store_api::logstore::provider::Provider;
91use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
92use store_api::metric_engine_consts::{
93    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
94};
95use store_api::region_engine::{
96    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
97    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
98};
99use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
100use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
101use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
102use tokio::sync::{Semaphore, oneshot};
103
104use crate::cache::{CacheManagerRef, CacheStrategy};
105use crate::config::MitoConfig;
106use crate::error::{
107    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
108    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
109};
110#[cfg(feature = "enterprise")]
111use crate::extension::BoxedExtensionRangeProviderFactory;
112use crate::manifest::action::RegionEdit;
113use crate::memtable::MemtableStats;
114use crate::metrics::HANDLE_REQUEST_ELAPSED;
115use crate::read::scan_region::{ScanRegion, Scanner};
116use crate::read::stream::ScanBatchStream;
117use crate::region::MitoRegionRef;
118use crate::region::opener::PartitionExprFetcherRef;
119use crate::request::{RegionEditRequest, WorkerRequest};
120use crate::sst::file::FileMeta;
121use crate::sst::file_ref::FileReferenceManagerRef;
122use crate::wal::entry_distributor::{
123    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
124};
125use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
126use crate::worker::WorkerGroup;
127
128pub const MITO_ENGINE_NAME: &str = "mito";
129
130pub struct MitoEngineBuilder<'a, S: LogStore> {
131    data_home: &'a str,
132    config: MitoConfig,
133    log_store: Arc<S>,
134    object_store_manager: ObjectStoreManagerRef,
135    schema_metadata_manager: SchemaMetadataManagerRef,
136    file_ref_manager: FileReferenceManagerRef,
137    partition_expr_fetcher: PartitionExprFetcherRef,
138    plugins: Plugins,
139    #[cfg(feature = "enterprise")]
140    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
141}
142
143impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
144    #[allow(clippy::too_many_arguments)]
145    pub fn new(
146        data_home: &'a str,
147        config: MitoConfig,
148        log_store: Arc<S>,
149        object_store_manager: ObjectStoreManagerRef,
150        schema_metadata_manager: SchemaMetadataManagerRef,
151        file_ref_manager: FileReferenceManagerRef,
152        partition_expr_fetcher: PartitionExprFetcherRef,
153        plugins: Plugins,
154    ) -> Self {
155        Self {
156            data_home,
157            config,
158            log_store,
159            object_store_manager,
160            schema_metadata_manager,
161            file_ref_manager,
162            plugins,
163            partition_expr_fetcher,
164            #[cfg(feature = "enterprise")]
165            extension_range_provider_factory: None,
166        }
167    }
168
169    #[cfg(feature = "enterprise")]
170    #[must_use]
171    pub fn with_extension_range_provider_factory(
172        self,
173        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
174    ) -> Self {
175        Self {
176            extension_range_provider_factory,
177            ..self
178        }
179    }
180
181    pub async fn try_build(mut self) -> Result<MitoEngine> {
182        self.config.sanitize(self.data_home)?;
183
184        let config = Arc::new(self.config);
185        let workers = WorkerGroup::start(
186            config.clone(),
187            self.log_store.clone(),
188            self.object_store_manager,
189            self.schema_metadata_manager,
190            self.file_ref_manager,
191            self.partition_expr_fetcher.clone(),
192            self.plugins,
193        )
194        .await?;
195        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
196        let inner = EngineInner {
197            workers,
198            config,
199            wal_raw_entry_reader,
200            #[cfg(feature = "enterprise")]
201            extension_range_provider_factory: None,
202        };
203
204        #[cfg(feature = "enterprise")]
205        let inner =
206            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
207
208        Ok(MitoEngine {
209            inner: Arc::new(inner),
210        })
211    }
212}
213
214/// Region engine implementation for timeseries data.
215#[derive(Clone)]
216pub struct MitoEngine {
217    inner: Arc<EngineInner>,
218}
219
220impl MitoEngine {
221    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
222    #[allow(clippy::too_many_arguments)]
223    pub async fn new<S: LogStore>(
224        data_home: &str,
225        config: MitoConfig,
226        log_store: Arc<S>,
227        object_store_manager: ObjectStoreManagerRef,
228        schema_metadata_manager: SchemaMetadataManagerRef,
229        file_ref_manager: FileReferenceManagerRef,
230        partition_expr_fetcher: PartitionExprFetcherRef,
231        plugins: Plugins,
232    ) -> Result<MitoEngine> {
233        let builder = MitoEngineBuilder::new(
234            data_home,
235            config,
236            log_store,
237            object_store_manager,
238            schema_metadata_manager,
239            file_ref_manager,
240            partition_expr_fetcher,
241            plugins,
242        );
243        builder.try_build().await
244    }
245
246    pub fn mito_config(&self) -> &MitoConfig {
247        &self.inner.config
248    }
249
250    pub fn cache_manager(&self) -> CacheManagerRef {
251        self.inner.workers.cache_manager()
252    }
253
254    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
255        self.inner.workers.file_ref_manager()
256    }
257
258    /// Returns true if the specific region exists.
259    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
260        self.inner.workers.is_region_exists(region_id)
261    }
262
263    /// Returns true if the specific region exists.
264    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
265        self.inner.workers.is_region_opening(region_id)
266    }
267
268    /// Returns the region disk/memory statistic.
269    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
270        self.find_region(region_id)
271            .map(|region| region.region_statistic())
272    }
273
274    /// Returns primary key encoding of the region.
275    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
276        self.find_region(region_id)
277            .map(|r| r.primary_key_encoding())
278    }
279
280    /// Handle substrait query and return a stream of record batches
281    ///
282    /// Notice that the output stream's ordering is not guranateed. If order
283    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
284    #[tracing::instrument(skip_all)]
285    pub async fn scan_to_stream(
286        &self,
287        region_id: RegionId,
288        request: ScanRequest,
289    ) -> Result<SendableRecordBatchStream, BoxedError> {
290        self.scanner(region_id, request)
291            .await
292            .map_err(BoxedError::new)?
293            .scan()
294            .await
295    }
296
297    /// Scan [`Batch`]es by [`ScanRequest`].
298    pub async fn scan_batch(
299        &self,
300        region_id: RegionId,
301        request: ScanRequest,
302        filter_deleted: bool,
303    ) -> Result<ScanBatchStream> {
304        let mut scan_region = self.scan_region(region_id, request)?;
305        scan_region.set_filter_deleted(filter_deleted);
306        scan_region.scanner().await?.scan_batch()
307    }
308
309    /// Returns a scanner to scan for `request`.
310    async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
311        self.scan_region(region_id, request)?.scanner().await
312    }
313
314    /// Scans a region.
315    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
316        self.inner.scan_region(region_id, request)
317    }
318
319    /// Edit region's metadata by [RegionEdit] directly. Use with care.
320    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
321    /// Other region editing intention will result in an "invalid request" error.
322    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
323    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
324        let _timer = HANDLE_REQUEST_ELAPSED
325            .with_label_values(&["edit_region"])
326            .start_timer();
327
328        ensure!(
329            is_valid_region_edit(&edit),
330            InvalidRequestSnafu {
331                region_id,
332                reason: "invalid region edit"
333            }
334        );
335
336        let (tx, rx) = oneshot::channel();
337        let request = WorkerRequest::EditRegion(RegionEditRequest {
338            region_id,
339            edit,
340            tx,
341        });
342        self.inner
343            .workers
344            .submit_to_worker(region_id, request)
345            .await?;
346        rx.await.context(RecvSnafu)?
347    }
348
349    #[cfg(test)]
350    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
351        self.find_region(id)
352    }
353
354    pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
355        self.inner.workers.get_region(region_id)
356    }
357
358    fn encode_manifest_info_to_extensions(
359        region_id: &RegionId,
360        manifest_info: RegionManifestInfo,
361        extensions: &mut HashMap<String, Vec<u8>>,
362    ) -> Result<()> {
363        let region_manifest_info = vec![(*region_id, manifest_info)];
364
365        extensions.insert(
366            MANIFEST_INFO_EXTENSION_KEY.to_string(),
367            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
368        );
369        info!(
370            "Added manifest info: {:?} to extensions, region_id: {:?}",
371            region_manifest_info, region_id
372        );
373        Ok(())
374    }
375
376    fn encode_column_metadatas_to_extensions(
377        region_id: &RegionId,
378        column_metadatas: Vec<ColumnMetadata>,
379        extensions: &mut HashMap<String, Vec<u8>>,
380    ) -> Result<()> {
381        extensions.insert(
382            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
383            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
384        );
385        info!(
386            "Added column metadatas: {:?} to extensions, region_id: {:?}",
387            column_metadatas, region_id
388        );
389        Ok(())
390    }
391
392    /// Find the current version's memtables and SSTs stats by region_id.
393    /// The stats must be collected in one place one time to ensure data consistency.
394    pub fn find_memtable_and_sst_stats(
395        &self,
396        region_id: RegionId,
397    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
398        let region = self
399            .find_region(region_id)
400            .context(RegionNotFoundSnafu { region_id })?;
401
402        let version = region.version();
403        let memtable_stats = version
404            .memtables
405            .list_memtables()
406            .iter()
407            .map(|x| x.stats())
408            .collect::<Vec<_>>();
409
410        let sst_stats = version
411            .ssts
412            .levels()
413            .iter()
414            .flat_map(|level| level.files().map(|x| x.meta_ref()))
415            .cloned()
416            .collect::<Vec<_>>();
417        Ok((memtable_stats, sst_stats))
418    }
419
420    /// Lists all SSTs from the manifest of all regions in the engine.
421    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
422        let node_id = self.inner.workers.file_ref_manager().node_id();
423        let regions = self.inner.workers.all_regions();
424
425        let mut results = Vec::new();
426        for region in regions {
427            let mut entries = region.manifest_sst_entries().await;
428            for e in &mut entries {
429                e.node_id = node_id;
430            }
431            results.extend(entries);
432        }
433
434        results
435    }
436
437    /// Lists all SSTs from the storage layer of all regions in the engine.
438    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
439        let node_id = self.inner.workers.file_ref_manager().node_id();
440        let regions = self.inner.workers.all_regions();
441
442        let mut layers_distinct_table_dirs = HashMap::new();
443        for region in regions {
444            let table_dir = region.access_layer.table_dir();
445            if !layers_distinct_table_dirs.contains_key(table_dir) {
446                layers_distinct_table_dirs
447                    .insert(table_dir.to_string(), region.access_layer.clone());
448            }
449        }
450
451        stream::iter(layers_distinct_table_dirs)
452            .map(|(_, access_layer)| access_layer.storage_sst_entries())
453            .flatten()
454            .map(move |entry| {
455                entry.map(move |mut entry| {
456                    entry.node_id = node_id;
457                    entry
458                })
459            })
460    }
461}
462
463/// Check whether the region edit is valid. Only adding files to region is considered valid now.
464fn is_valid_region_edit(edit: &RegionEdit) -> bool {
465    !edit.files_to_add.is_empty()
466        && edit.files_to_remove.is_empty()
467        && matches!(
468            edit,
469            RegionEdit {
470                files_to_add: _,
471                files_to_remove: _,
472                timestamp_ms: _,
473                compaction_time_window: None,
474                flushed_entry_id: None,
475                flushed_sequence: None,
476                ..
477            }
478        )
479}
480
481/// Inner struct of [MitoEngine].
482struct EngineInner {
483    /// Region workers group.
484    workers: WorkerGroup,
485    /// Config of the engine.
486    config: Arc<MitoConfig>,
487    /// The Wal raw entry reader.
488    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
489    #[cfg(feature = "enterprise")]
490    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
491}
492
493type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
494
495/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
496fn prepare_batch_open_requests(
497    requests: Vec<(RegionId, RegionOpenRequest)>,
498) -> Result<(
499    TopicGroupedRegionOpenRequests,
500    Vec<(RegionId, RegionOpenRequest)>,
501)> {
502    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
503    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
504    for (region_id, request) in requests {
505        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
506            serde_json::from_str(options).context(SerdeJsonSnafu)?
507        } else {
508            WalOptions::RaftEngine
509        };
510        match options {
511            WalOptions::Kafka(options) => {
512                topic_to_regions
513                    .entry(options.topic)
514                    .or_default()
515                    .push((region_id, request));
516            }
517            WalOptions::RaftEngine | WalOptions::Noop => {
518                remaining_regions.push((region_id, request));
519            }
520        }
521    }
522
523    Ok((topic_to_regions, remaining_regions))
524}
525
526impl EngineInner {
527    #[cfg(feature = "enterprise")]
528    #[must_use]
529    fn with_extension_range_provider_factory(
530        self,
531        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
532    ) -> Self {
533        Self {
534            extension_range_provider_factory,
535            ..self
536        }
537    }
538
539    /// Stop the inner engine.
540    async fn stop(&self) -> Result<()> {
541        self.workers.stop().await
542    }
543
544    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
545        self.workers
546            .get_region(region_id)
547            .context(RegionNotFoundSnafu { region_id })
548    }
549
550    /// Get metadata of a region.
551    ///
552    /// Returns error if the region doesn't exist.
553    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
554        // Reading a region doesn't need to go through the region worker thread.
555        let region = self.find_region(region_id)?;
556        Ok(region.metadata())
557    }
558
559    async fn open_topic_regions(
560        &self,
561        topic: String,
562        region_requests: Vec<(RegionId, RegionOpenRequest)>,
563    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
564        let now = Instant::now();
565        let region_ids = region_requests
566            .iter()
567            .map(|(region_id, _)| *region_id)
568            .collect::<Vec<_>>();
569        let provider = Provider::kafka_provider(topic);
570        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
571            provider.clone(),
572            self.wal_raw_entry_reader.clone(),
573            &region_ids,
574            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
575        );
576
577        let mut responses = Vec::with_capacity(region_requests.len());
578        for ((region_id, request), entry_receiver) in
579            region_requests.into_iter().zip(entry_receivers)
580        {
581            let (request, receiver) =
582                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
583            self.workers.submit_to_worker(region_id, request).await?;
584            responses.push(async move { receiver.await.context(RecvSnafu)? });
585        }
586
587        // Waits for entries distribution.
588        let distribution =
589            common_runtime::spawn_global(async move { distributor.distribute().await });
590        // Waits for worker returns.
591        let responses = join_all(responses).await;
592        distribution.await.context(JoinSnafu)??;
593
594        let num_failure = responses.iter().filter(|r| r.is_err()).count();
595        info!(
596            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
597            region_ids.len() - num_failure,
598            // Safety: provider is kafka provider.
599            provider.as_kafka_provider().unwrap(),
600            num_failure,
601            now.elapsed(),
602        );
603        Ok(region_ids.into_iter().zip(responses).collect())
604    }
605
606    async fn handle_batch_open_requests(
607        &self,
608        parallelism: usize,
609        requests: Vec<(RegionId, RegionOpenRequest)>,
610    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
611        let semaphore = Arc::new(Semaphore::new(parallelism));
612        let (topic_to_region_requests, remaining_region_requests) =
613            prepare_batch_open_requests(requests)?;
614        let mut responses =
615            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
616
617        if !topic_to_region_requests.is_empty() {
618            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
619            for (topic, region_requests) in topic_to_region_requests {
620                let semaphore_moved = semaphore.clone();
621                tasks.push(async move {
622                    // Safety: semaphore must exist
623                    let _permit = semaphore_moved.acquire().await.unwrap();
624                    self.open_topic_regions(topic, region_requests).await
625                })
626            }
627            let r = try_join_all(tasks).await?;
628            responses.extend(r.into_iter().flatten());
629        }
630
631        if !remaining_region_requests.is_empty() {
632            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
633            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
634            for (region_id, request) in remaining_region_requests {
635                let semaphore_moved = semaphore.clone();
636                region_ids.push(region_id);
637                tasks.push(async move {
638                    // Safety: semaphore must exist
639                    let _permit = semaphore_moved.acquire().await.unwrap();
640                    let (request, receiver) =
641                        WorkerRequest::new_open_region_request(region_id, request, None);
642
643                    self.workers.submit_to_worker(region_id, request).await?;
644
645                    receiver.await.context(RecvSnafu)?
646                })
647            }
648
649            let results = join_all(tasks).await;
650            responses.extend(region_ids.into_iter().zip(results));
651        }
652
653        Ok(responses)
654    }
655
656    /// Handles [RegionRequest] and return its executed result.
657    async fn handle_request(
658        &self,
659        region_id: RegionId,
660        request: RegionRequest,
661    ) -> Result<AffectedRows> {
662        let region_metadata = self.get_metadata(region_id).ok();
663        let (request, receiver) =
664            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
665        self.workers.submit_to_worker(region_id, request).await?;
666
667        receiver.await.context(RecvSnafu)?
668    }
669
670    /// Returns the sequence of latest committed data.
671    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
672        // Reading a region doesn't need to go through the region worker thread.
673        self.find_region(region_id)
674            .map(|r| r.find_committed_sequence())
675    }
676
677    /// Handles the scan `request` and returns a [ScanRegion].
678    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
679        let query_start = Instant::now();
680        // Reading a region doesn't need to go through the region worker thread.
681        let region = self.find_region(region_id)?;
682        let version = region.version();
683        // Get cache.
684        let cache_manager = self.workers.cache_manager();
685
686        let scan_region = ScanRegion::new(
687            version,
688            region.access_layer.clone(),
689            request,
690            CacheStrategy::EnableAll(cache_manager),
691        )
692        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
693        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
694        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
695        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
696        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
697        .with_start_time(query_start)
698        .with_flat_format(self.config.default_experimental_flat_format);
699
700        #[cfg(feature = "enterprise")]
701        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
702
703        Ok(scan_region)
704    }
705
706    #[cfg(feature = "enterprise")]
707    fn maybe_fill_extension_range_provider(
708        &self,
709        mut scan_region: ScanRegion,
710        region: MitoRegionRef,
711    ) -> ScanRegion {
712        if region.is_follower()
713            && let Some(factory) = self.extension_range_provider_factory.as_ref()
714        {
715            scan_region
716                .set_extension_range_provider(factory.create_extension_range_provider(region));
717        }
718        scan_region
719    }
720
721    /// Converts the [`RegionRole`].
722    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
723        let region = self.find_region(region_id)?;
724        region.set_role(role);
725        Ok(())
726    }
727
728    /// Sets read-only for a region and ensures no more writes in the region after it returns.
729    async fn set_region_role_state_gracefully(
730        &self,
731        region_id: RegionId,
732        region_role_state: SettableRegionRoleState,
733    ) -> Result<SetRegionRoleStateResponse> {
734        // Notes: It acquires the mutable ownership to ensure no other threads,
735        // Therefore, we submit it to the worker.
736        let (request, receiver) =
737            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
738        self.workers.submit_to_worker(region_id, request).await?;
739
740        receiver.await.context(RecvSnafu)
741    }
742
743    async fn sync_region(
744        &self,
745        region_id: RegionId,
746        manifest_info: RegionManifestInfo,
747    ) -> Result<(ManifestVersion, bool)> {
748        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
749        let manifest_version = manifest_info.data_manifest_version();
750        let (request, receiver) =
751            WorkerRequest::new_sync_region_request(region_id, manifest_version);
752        self.workers.submit_to_worker(region_id, request).await?;
753
754        receiver.await.context(RecvSnafu)?
755    }
756
757    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
758        self.workers.get_region(region_id).map(|region| {
759            if region.is_follower() {
760                RegionRole::Follower
761            } else {
762                RegionRole::Leader
763            }
764        })
765    }
766}
767
768#[async_trait]
769impl RegionEngine for MitoEngine {
770    fn name(&self) -> &str {
771        MITO_ENGINE_NAME
772    }
773
774    #[tracing::instrument(skip_all)]
775    async fn handle_batch_open_requests(
776        &self,
777        parallelism: usize,
778        requests: Vec<(RegionId, RegionOpenRequest)>,
779    ) -> Result<BatchResponses, BoxedError> {
780        // TODO(weny): add metrics.
781        self.inner
782            .handle_batch_open_requests(parallelism, requests)
783            .await
784            .map(|responses| {
785                responses
786                    .into_iter()
787                    .map(|(region_id, response)| {
788                        (
789                            region_id,
790                            response.map(RegionResponse::new).map_err(BoxedError::new),
791                        )
792                    })
793                    .collect::<Vec<_>>()
794            })
795            .map_err(BoxedError::new)
796    }
797
798    #[tracing::instrument(skip_all)]
799    async fn handle_request(
800        &self,
801        region_id: RegionId,
802        request: RegionRequest,
803    ) -> Result<RegionResponse, BoxedError> {
804        let _timer = HANDLE_REQUEST_ELAPSED
805            .with_label_values(&[request.request_type()])
806            .start_timer();
807
808        let is_alter = matches!(request, RegionRequest::Alter(_));
809        let is_create = matches!(request, RegionRequest::Create(_));
810        let mut response = self
811            .inner
812            .handle_request(region_id, request)
813            .await
814            .map(RegionResponse::new)
815            .map_err(BoxedError::new)?;
816
817        if is_alter {
818            self.handle_alter_response(region_id, &mut response)
819                .map_err(BoxedError::new)?;
820        } else if is_create {
821            self.handle_create_response(region_id, &mut response)
822                .map_err(BoxedError::new)?;
823        }
824
825        Ok(response)
826    }
827
828    #[tracing::instrument(skip_all)]
829    async fn handle_query(
830        &self,
831        region_id: RegionId,
832        request: ScanRequest,
833    ) -> Result<RegionScannerRef, BoxedError> {
834        self.scan_region(region_id, request)
835            .map_err(BoxedError::new)?
836            .region_scanner()
837            .await
838            .map_err(BoxedError::new)
839    }
840
841    async fn get_committed_sequence(
842        &self,
843        region_id: RegionId,
844    ) -> Result<SequenceNumber, BoxedError> {
845        self.inner
846            .get_committed_sequence(region_id)
847            .map_err(BoxedError::new)
848    }
849
850    /// Retrieve region's metadata.
851    async fn get_metadata(
852        &self,
853        region_id: RegionId,
854    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
855        self.inner.get_metadata(region_id).map_err(BoxedError::new)
856    }
857
858    /// Stop the engine.
859    ///
860    /// Stopping the engine doesn't stop the underlying log store as other components might
861    /// still use it. (When no other components are referencing the log store, it will
862    /// automatically shutdown.)
863    async fn stop(&self) -> std::result::Result<(), BoxedError> {
864        self.inner.stop().await.map_err(BoxedError::new)
865    }
866
867    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
868        self.get_region_statistic(region_id)
869    }
870
871    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
872        self.inner
873            .set_region_role(region_id, role)
874            .map_err(BoxedError::new)
875    }
876
877    async fn set_region_role_state_gracefully(
878        &self,
879        region_id: RegionId,
880        region_role_state: SettableRegionRoleState,
881    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
882        let _timer = HANDLE_REQUEST_ELAPSED
883            .with_label_values(&["set_region_role_state_gracefully"])
884            .start_timer();
885
886        self.inner
887            .set_region_role_state_gracefully(region_id, region_role_state)
888            .await
889            .map_err(BoxedError::new)
890    }
891
892    async fn sync_region(
893        &self,
894        region_id: RegionId,
895        manifest_info: RegionManifestInfo,
896    ) -> Result<SyncManifestResponse, BoxedError> {
897        let (_, synced) = self
898            .inner
899            .sync_region(region_id, manifest_info)
900            .await
901            .map_err(BoxedError::new)?;
902
903        Ok(SyncManifestResponse::Mito { synced })
904    }
905
906    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
907        self.inner.role(region_id)
908    }
909
910    fn as_any(&self) -> &dyn Any {
911        self
912    }
913}
914
915impl MitoEngine {
916    fn handle_alter_response(
917        &self,
918        region_id: RegionId,
919        response: &mut RegionResponse,
920    ) -> Result<()> {
921        if let Some(statistic) = self.region_statistic(region_id) {
922            Self::encode_manifest_info_to_extensions(
923                &region_id,
924                statistic.manifest,
925                &mut response.extensions,
926            )?;
927        }
928        let column_metadatas = self
929            .inner
930            .find_region(region_id)
931            .ok()
932            .map(|r| r.metadata().column_metadatas.clone());
933        if let Some(column_metadatas) = column_metadatas {
934            Self::encode_column_metadatas_to_extensions(
935                &region_id,
936                column_metadatas,
937                &mut response.extensions,
938            )?;
939        }
940        Ok(())
941    }
942
943    fn handle_create_response(
944        &self,
945        region_id: RegionId,
946        response: &mut RegionResponse,
947    ) -> Result<()> {
948        let column_metadatas = self
949            .inner
950            .find_region(region_id)
951            .ok()
952            .map(|r| r.metadata().column_metadatas.clone());
953        if let Some(column_metadatas) = column_metadatas {
954            Self::encode_column_metadatas_to_extensions(
955                &region_id,
956                column_metadatas,
957                &mut response.extensions,
958            )?;
959        }
960        Ok(())
961    }
962}
963
964// Tests methods.
965#[cfg(any(test, feature = "test"))]
966#[allow(clippy::too_many_arguments)]
967impl MitoEngine {
968    /// Returns a new [MitoEngine] for tests.
969    pub async fn new_for_test<S: LogStore>(
970        data_home: &str,
971        mut config: MitoConfig,
972        log_store: Arc<S>,
973        object_store_manager: ObjectStoreManagerRef,
974        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
975        listener: Option<crate::engine::listener::EventListenerRef>,
976        time_provider: crate::time_provider::TimeProviderRef,
977        schema_metadata_manager: SchemaMetadataManagerRef,
978        file_ref_manager: FileReferenceManagerRef,
979        partition_expr_fetcher: PartitionExprFetcherRef,
980    ) -> Result<MitoEngine> {
981        config.sanitize(data_home)?;
982
983        let config = Arc::new(config);
984        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
985        Ok(MitoEngine {
986            inner: Arc::new(EngineInner {
987                workers: WorkerGroup::start_for_test(
988                    config.clone(),
989                    log_store,
990                    object_store_manager,
991                    write_buffer_manager,
992                    listener,
993                    schema_metadata_manager,
994                    file_ref_manager,
995                    time_provider,
996                    partition_expr_fetcher,
997                )
998                .await?,
999                config,
1000                wal_raw_entry_reader,
1001                #[cfg(feature = "enterprise")]
1002                extension_range_provider_factory: None,
1003            }),
1004        })
1005    }
1006
1007    /// Returns the purge scheduler.
1008    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1009        self.inner.workers.purge_scheduler()
1010    }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015    use std::time::Duration;
1016
1017    use super::*;
1018    use crate::sst::file::FileMeta;
1019
1020    #[test]
1021    fn test_is_valid_region_edit() {
1022        // Valid: has only "files_to_add"
1023        let edit = RegionEdit {
1024            files_to_add: vec![FileMeta::default()],
1025            files_to_remove: vec![],
1026            timestamp_ms: None,
1027            compaction_time_window: None,
1028            flushed_entry_id: None,
1029            flushed_sequence: None,
1030            committed_sequence: None,
1031        };
1032        assert!(is_valid_region_edit(&edit));
1033
1034        // Invalid: "files_to_add" is empty
1035        let edit = RegionEdit {
1036            files_to_add: vec![],
1037            files_to_remove: vec![],
1038            timestamp_ms: None,
1039            compaction_time_window: None,
1040            flushed_entry_id: None,
1041            flushed_sequence: None,
1042            committed_sequence: None,
1043        };
1044        assert!(!is_valid_region_edit(&edit));
1045
1046        // Invalid: "files_to_remove" is not empty
1047        let edit = RegionEdit {
1048            files_to_add: vec![FileMeta::default()],
1049            files_to_remove: vec![FileMeta::default()],
1050            timestamp_ms: None,
1051            compaction_time_window: None,
1052            flushed_entry_id: None,
1053            flushed_sequence: None,
1054            committed_sequence: None,
1055        };
1056        assert!(!is_valid_region_edit(&edit));
1057
1058        // Invalid: other fields are not all "None"s
1059        let edit = RegionEdit {
1060            files_to_add: vec![FileMeta::default()],
1061            files_to_remove: vec![],
1062            timestamp_ms: None,
1063            compaction_time_window: Some(Duration::from_secs(1)),
1064            flushed_entry_id: None,
1065            flushed_sequence: None,
1066            committed_sequence: None,
1067        };
1068        assert!(!is_valid_region_edit(&edit));
1069        let edit = RegionEdit {
1070            files_to_add: vec![FileMeta::default()],
1071            files_to_remove: vec![],
1072            timestamp_ms: None,
1073            compaction_time_window: None,
1074            flushed_entry_id: Some(1),
1075            flushed_sequence: None,
1076            committed_sequence: None,
1077        };
1078        assert!(!is_valid_region_edit(&edit));
1079        let edit = RegionEdit {
1080            files_to_add: vec![FileMeta::default()],
1081            files_to_remove: vec![],
1082            timestamp_ms: None,
1083            compaction_time_window: None,
1084            flushed_entry_id: None,
1085            flushed_sequence: Some(1),
1086            committed_sequence: None,
1087        };
1088        assert!(!is_valid_region_edit(&edit));
1089    }
1090}