mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod scan_test;
57#[cfg(test)]
58mod set_role_state_test;
59#[cfg(test)]
60mod sync_test;
61#[cfg(test)]
62mod truncate_test;
63
64use std::any::Any;
65use std::collections::HashMap;
66use std::sync::Arc;
67use std::time::Instant;
68
69use api::region::RegionResponse;
70use async_trait::async_trait;
71use common_base::Plugins;
72use common_error::ext::BoxedError;
73use common_meta::key::SchemaMetadataManagerRef;
74use common_recordbatch::SendableRecordBatchStream;
75use common_telemetry::{info, tracing};
76use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
77use futures::future::{join_all, try_join_all};
78use object_store::manager::ObjectStoreManagerRef;
79use snafu::{ensure, OptionExt, ResultExt};
80use store_api::codec::PrimaryKeyEncoding;
81use store_api::logstore::provider::Provider;
82use store_api::logstore::LogStore;
83use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
84use store_api::metric_engine_consts::{
85    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
86};
87use store_api::region_engine::{
88    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
89    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
90};
91use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
92use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
93use store_api::ManifestVersion;
94use tokio::sync::{oneshot, Semaphore};
95
96use crate::cache::CacheStrategy;
97use crate::config::MitoConfig;
98use crate::error::{
99    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
100    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
101};
102#[cfg(feature = "enterprise")]
103use crate::extension::BoxedExtensionRangeProviderFactory;
104use crate::manifest::action::RegionEdit;
105use crate::memtable::MemtableStats;
106use crate::metrics::HANDLE_REQUEST_ELAPSED;
107use crate::read::scan_region::{ScanRegion, Scanner};
108use crate::read::stream::ScanBatchStream;
109use crate::region::MitoRegionRef;
110use crate::request::{RegionEditRequest, WorkerRequest};
111use crate::sst::file::FileMeta;
112use crate::wal::entry_distributor::{
113    build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
114};
115use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
116use crate::worker::WorkerGroup;
117
118pub const MITO_ENGINE_NAME: &str = "mito";
119
120pub struct MitoEngineBuilder<'a, S: LogStore> {
121    data_home: &'a str,
122    config: MitoConfig,
123    log_store: Arc<S>,
124    object_store_manager: ObjectStoreManagerRef,
125    schema_metadata_manager: SchemaMetadataManagerRef,
126    plugins: Plugins,
127    #[cfg(feature = "enterprise")]
128    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
129}
130
131impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
132    pub fn new(
133        data_home: &'a str,
134        config: MitoConfig,
135        log_store: Arc<S>,
136        object_store_manager: ObjectStoreManagerRef,
137        schema_metadata_manager: SchemaMetadataManagerRef,
138        plugins: Plugins,
139    ) -> Self {
140        Self {
141            data_home,
142            config,
143            log_store,
144            object_store_manager,
145            schema_metadata_manager,
146            plugins,
147            #[cfg(feature = "enterprise")]
148            extension_range_provider_factory: None,
149        }
150    }
151
152    #[cfg(feature = "enterprise")]
153    #[must_use]
154    pub fn with_extension_range_provider_factory(
155        self,
156        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
157    ) -> Self {
158        Self {
159            extension_range_provider_factory,
160            ..self
161        }
162    }
163
164    pub async fn try_build(mut self) -> Result<MitoEngine> {
165        self.config.sanitize(self.data_home)?;
166
167        let config = Arc::new(self.config);
168        let workers = WorkerGroup::start(
169            config.clone(),
170            self.log_store.clone(),
171            self.object_store_manager,
172            self.schema_metadata_manager,
173            self.plugins,
174        )
175        .await?;
176        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
177        let inner = EngineInner {
178            workers,
179            config,
180            wal_raw_entry_reader,
181            #[cfg(feature = "enterprise")]
182            extension_range_provider_factory: None,
183        };
184
185        #[cfg(feature = "enterprise")]
186        let inner =
187            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
188
189        Ok(MitoEngine {
190            inner: Arc::new(inner),
191        })
192    }
193}
194
195/// Region engine implementation for timeseries data.
196#[derive(Clone)]
197pub struct MitoEngine {
198    inner: Arc<EngineInner>,
199}
200
201impl MitoEngine {
202    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
203    pub async fn new<S: LogStore>(
204        data_home: &str,
205        config: MitoConfig,
206        log_store: Arc<S>,
207        object_store_manager: ObjectStoreManagerRef,
208        schema_metadata_manager: SchemaMetadataManagerRef,
209        plugins: Plugins,
210    ) -> Result<MitoEngine> {
211        let builder = MitoEngineBuilder::new(
212            data_home,
213            config,
214            log_store,
215            object_store_manager,
216            schema_metadata_manager,
217            plugins,
218        );
219        builder.try_build().await
220    }
221
222    /// Returns true if the specific region exists.
223    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
224        self.inner.workers.is_region_exists(region_id)
225    }
226
227    /// Returns true if the specific region exists.
228    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
229        self.inner.workers.is_region_opening(region_id)
230    }
231
232    /// Returns the region disk/memory statistic.
233    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
234        self.find_region(region_id)
235            .map(|region| region.region_statistic())
236    }
237
238    /// Returns primary key encoding of the region.
239    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
240        self.find_region(region_id)
241            .map(|r| r.primary_key_encoding())
242    }
243
244    /// Handle substrait query and return a stream of record batches
245    ///
246    /// Notice that the output stream's ordering is not guranateed. If order
247    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
248    #[tracing::instrument(skip_all)]
249    pub async fn scan_to_stream(
250        &self,
251        region_id: RegionId,
252        request: ScanRequest,
253    ) -> Result<SendableRecordBatchStream, BoxedError> {
254        self.scanner(region_id, request)
255            .await
256            .map_err(BoxedError::new)?
257            .scan()
258            .await
259    }
260
261    /// Scan [`Batch`]es by [`ScanRequest`].
262    pub async fn scan_batch(
263        &self,
264        region_id: RegionId,
265        request: ScanRequest,
266        filter_deleted: bool,
267    ) -> Result<ScanBatchStream> {
268        let mut scan_region = self.scan_region(region_id, request)?;
269        scan_region.set_filter_deleted(filter_deleted);
270        scan_region.scanner().await?.scan_batch()
271    }
272
273    /// Returns a scanner to scan for `request`.
274    async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
275        self.scan_region(region_id, request)?.scanner().await
276    }
277
278    /// Scans a region.
279    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
280        self.inner.scan_region(region_id, request)
281    }
282
283    /// Edit region's metadata by [RegionEdit] directly. Use with care.
284    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
285    /// Other region editing intention will result in an "invalid request" error.
286    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
287    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
288        let _timer = HANDLE_REQUEST_ELAPSED
289            .with_label_values(&["edit_region"])
290            .start_timer();
291
292        ensure!(
293            is_valid_region_edit(&edit),
294            InvalidRequestSnafu {
295                region_id,
296                reason: "invalid region edit"
297            }
298        );
299
300        let (tx, rx) = oneshot::channel();
301        let request = WorkerRequest::EditRegion(RegionEditRequest {
302            region_id,
303            edit,
304            tx,
305        });
306        self.inner
307            .workers
308            .submit_to_worker(region_id, request)
309            .await?;
310        rx.await.context(RecvSnafu)?
311    }
312
313    #[cfg(test)]
314    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
315        self.find_region(id)
316    }
317
318    fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
319        self.inner.workers.get_region(region_id)
320    }
321
322    fn encode_manifest_info_to_extensions(
323        region_id: &RegionId,
324        manifest_info: RegionManifestInfo,
325        extensions: &mut HashMap<String, Vec<u8>>,
326    ) -> Result<()> {
327        let region_manifest_info = vec![(*region_id, manifest_info)];
328
329        extensions.insert(
330            MANIFEST_INFO_EXTENSION_KEY.to_string(),
331            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
332        );
333        info!(
334            "Added manifest info: {:?} to extensions, region_id: {:?}",
335            region_manifest_info, region_id
336        );
337        Ok(())
338    }
339
340    fn encode_column_metadatas_to_extensions(
341        region_id: &RegionId,
342        column_metadatas: Vec<ColumnMetadata>,
343        extensions: &mut HashMap<String, Vec<u8>>,
344    ) -> Result<()> {
345        extensions.insert(
346            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
347            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
348        );
349        info!(
350            "Added column metadatas: {:?} to extensions, region_id: {:?}",
351            column_metadatas, region_id
352        );
353        Ok(())
354    }
355
356    /// Find the current version's memtables and SSTs stats by region_id.
357    /// The stats must be collected in one place one time to ensure data consistency.
358    pub fn find_memtable_and_sst_stats(
359        &self,
360        region_id: RegionId,
361    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
362        let region = self
363            .find_region(region_id)
364            .context(RegionNotFoundSnafu { region_id })?;
365
366        let version = region.version();
367        let memtable_stats = version
368            .memtables
369            .list_memtables()
370            .iter()
371            .map(|x| x.stats())
372            .collect::<Vec<_>>();
373
374        let sst_stats = version
375            .ssts
376            .levels()
377            .iter()
378            .flat_map(|level| level.files().map(|x| x.meta_ref()))
379            .cloned()
380            .collect::<Vec<_>>();
381        Ok((memtable_stats, sst_stats))
382    }
383}
384
385/// Check whether the region edit is valid. Only adding files to region is considered valid now.
386fn is_valid_region_edit(edit: &RegionEdit) -> bool {
387    !edit.files_to_add.is_empty()
388        && edit.files_to_remove.is_empty()
389        && matches!(
390            edit,
391            RegionEdit {
392                files_to_add: _,
393                files_to_remove: _,
394                compaction_time_window: None,
395                flushed_entry_id: None,
396                flushed_sequence: None,
397            }
398        )
399}
400
401/// Inner struct of [MitoEngine].
402struct EngineInner {
403    /// Region workers group.
404    workers: WorkerGroup,
405    /// Config of the engine.
406    config: Arc<MitoConfig>,
407    /// The Wal raw entry reader.
408    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
409    #[cfg(feature = "enterprise")]
410    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
411}
412
413type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
414
415/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
416fn prepare_batch_open_requests(
417    requests: Vec<(RegionId, RegionOpenRequest)>,
418) -> Result<(
419    TopicGroupedRegionOpenRequests,
420    Vec<(RegionId, RegionOpenRequest)>,
421)> {
422    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
423    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
424    for (region_id, request) in requests {
425        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
426            serde_json::from_str(options).context(SerdeJsonSnafu)?
427        } else {
428            WalOptions::RaftEngine
429        };
430        match options {
431            WalOptions::Kafka(options) => {
432                topic_to_regions
433                    .entry(options.topic)
434                    .or_default()
435                    .push((region_id, request));
436            }
437            WalOptions::RaftEngine | WalOptions::Noop => {
438                remaining_regions.push((region_id, request));
439            }
440        }
441    }
442
443    Ok((topic_to_regions, remaining_regions))
444}
445
446impl EngineInner {
447    #[cfg(feature = "enterprise")]
448    #[must_use]
449    fn with_extension_range_provider_factory(
450        self,
451        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
452    ) -> Self {
453        Self {
454            extension_range_provider_factory,
455            ..self
456        }
457    }
458
459    /// Stop the inner engine.
460    async fn stop(&self) -> Result<()> {
461        self.workers.stop().await
462    }
463
464    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
465        self.workers
466            .get_region(region_id)
467            .context(RegionNotFoundSnafu { region_id })
468    }
469
470    /// Get metadata of a region.
471    ///
472    /// Returns error if the region doesn't exist.
473    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
474        // Reading a region doesn't need to go through the region worker thread.
475        let region = self.find_region(region_id)?;
476        Ok(region.metadata())
477    }
478
479    async fn open_topic_regions(
480        &self,
481        topic: String,
482        region_requests: Vec<(RegionId, RegionOpenRequest)>,
483    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
484        let region_ids = region_requests
485            .iter()
486            .map(|(region_id, _)| *region_id)
487            .collect::<Vec<_>>();
488        let provider = Provider::kafka_provider(topic);
489        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
490            provider,
491            self.wal_raw_entry_reader.clone(),
492            &region_ids,
493            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
494        );
495
496        let mut responses = Vec::with_capacity(region_requests.len());
497        for ((region_id, request), entry_receiver) in
498            region_requests.into_iter().zip(entry_receivers)
499        {
500            let (request, receiver) =
501                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
502            self.workers.submit_to_worker(region_id, request).await?;
503            responses.push(async move { receiver.await.context(RecvSnafu)? });
504        }
505
506        // Waits for entries distribution.
507        let distribution =
508            common_runtime::spawn_global(async move { distributor.distribute().await });
509        // Waits for worker returns.
510        let responses = join_all(responses).await;
511
512        distribution.await.context(JoinSnafu)??;
513        Ok(region_ids.into_iter().zip(responses).collect())
514    }
515
516    async fn handle_batch_open_requests(
517        &self,
518        parallelism: usize,
519        requests: Vec<(RegionId, RegionOpenRequest)>,
520    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
521        let semaphore = Arc::new(Semaphore::new(parallelism));
522        let (topic_to_region_requests, remaining_region_requests) =
523            prepare_batch_open_requests(requests)?;
524        let mut responses =
525            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
526
527        if !topic_to_region_requests.is_empty() {
528            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
529            for (topic, region_requests) in topic_to_region_requests {
530                let semaphore_moved = semaphore.clone();
531                tasks.push(async move {
532                    // Safety: semaphore must exist
533                    let _permit = semaphore_moved.acquire().await.unwrap();
534                    self.open_topic_regions(topic, region_requests).await
535                })
536            }
537            let r = try_join_all(tasks).await?;
538            responses.extend(r.into_iter().flatten());
539        }
540
541        if !remaining_region_requests.is_empty() {
542            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
543            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
544            for (region_id, request) in remaining_region_requests {
545                let semaphore_moved = semaphore.clone();
546                region_ids.push(region_id);
547                tasks.push(async move {
548                    // Safety: semaphore must exist
549                    let _permit = semaphore_moved.acquire().await.unwrap();
550                    let (request, receiver) =
551                        WorkerRequest::new_open_region_request(region_id, request, None);
552
553                    self.workers.submit_to_worker(region_id, request).await?;
554
555                    receiver.await.context(RecvSnafu)?
556                })
557            }
558
559            let results = join_all(tasks).await;
560            responses.extend(region_ids.into_iter().zip(results));
561        }
562
563        Ok(responses)
564    }
565
566    /// Handles [RegionRequest] and return its executed result.
567    async fn handle_request(
568        &self,
569        region_id: RegionId,
570        request: RegionRequest,
571    ) -> Result<AffectedRows> {
572        let region_metadata = self.get_metadata(region_id).ok();
573        let (request, receiver) =
574            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
575        self.workers.submit_to_worker(region_id, request).await?;
576
577        receiver.await.context(RecvSnafu)?
578    }
579
580    fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
581        // Reading a region doesn't need to go through the region worker thread.
582        let region = self.find_region(region_id)?;
583        Ok(Some(region.find_committed_sequence()))
584    }
585
586    /// Handles the scan `request` and returns a [ScanRegion].
587    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
588        let query_start = Instant::now();
589        // Reading a region doesn't need to go through the region worker thread.
590        let region = self.find_region(region_id)?;
591        let version = region.version();
592        // Get cache.
593        let cache_manager = self.workers.cache_manager();
594
595        let scan_region = ScanRegion::new(
596            version,
597            region.access_layer.clone(),
598            request,
599            CacheStrategy::EnableAll(cache_manager),
600        )
601        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
602        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
603        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
604        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
605        .with_start_time(query_start);
606
607        #[cfg(feature = "enterprise")]
608        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
609
610        Ok(scan_region)
611    }
612
613    #[cfg(feature = "enterprise")]
614    fn maybe_fill_extension_range_provider(
615        &self,
616        mut scan_region: ScanRegion,
617        region: MitoRegionRef,
618    ) -> ScanRegion {
619        if region.is_follower()
620            && let Some(factory) = self.extension_range_provider_factory.as_ref()
621        {
622            scan_region
623                .set_extension_range_provider(factory.create_extension_range_provider(region));
624        }
625        scan_region
626    }
627
628    /// Converts the [`RegionRole`].
629    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
630        let region = self.find_region(region_id)?;
631        region.set_role(role);
632        Ok(())
633    }
634
635    /// Sets read-only for a region and ensures no more writes in the region after it returns.
636    async fn set_region_role_state_gracefully(
637        &self,
638        region_id: RegionId,
639        region_role_state: SettableRegionRoleState,
640    ) -> Result<SetRegionRoleStateResponse> {
641        // Notes: It acquires the mutable ownership to ensure no other threads,
642        // Therefore, we submit it to the worker.
643        let (request, receiver) =
644            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
645        self.workers.submit_to_worker(region_id, request).await?;
646
647        receiver.await.context(RecvSnafu)
648    }
649
650    async fn sync_region(
651        &self,
652        region_id: RegionId,
653        manifest_info: RegionManifestInfo,
654    ) -> Result<(ManifestVersion, bool)> {
655        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
656        let manifest_version = manifest_info.data_manifest_version();
657        let (request, receiver) =
658            WorkerRequest::new_sync_region_request(region_id, manifest_version);
659        self.workers.submit_to_worker(region_id, request).await?;
660
661        receiver.await.context(RecvSnafu)?
662    }
663
664    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
665        self.workers.get_region(region_id).map(|region| {
666            if region.is_follower() {
667                RegionRole::Follower
668            } else {
669                RegionRole::Leader
670            }
671        })
672    }
673}
674
675#[async_trait]
676impl RegionEngine for MitoEngine {
677    fn name(&self) -> &str {
678        MITO_ENGINE_NAME
679    }
680
681    #[tracing::instrument(skip_all)]
682    async fn handle_batch_open_requests(
683        &self,
684        parallelism: usize,
685        requests: Vec<(RegionId, RegionOpenRequest)>,
686    ) -> Result<BatchResponses, BoxedError> {
687        // TODO(weny): add metrics.
688        self.inner
689            .handle_batch_open_requests(parallelism, requests)
690            .await
691            .map(|responses| {
692                responses
693                    .into_iter()
694                    .map(|(region_id, response)| {
695                        (
696                            region_id,
697                            response.map(RegionResponse::new).map_err(BoxedError::new),
698                        )
699                    })
700                    .collect::<Vec<_>>()
701            })
702            .map_err(BoxedError::new)
703    }
704
705    #[tracing::instrument(skip_all)]
706    async fn handle_request(
707        &self,
708        region_id: RegionId,
709        request: RegionRequest,
710    ) -> Result<RegionResponse, BoxedError> {
711        let _timer = HANDLE_REQUEST_ELAPSED
712            .with_label_values(&[request.request_type()])
713            .start_timer();
714
715        let is_alter = matches!(request, RegionRequest::Alter(_));
716        let is_create = matches!(request, RegionRequest::Create(_));
717        let mut response = self
718            .inner
719            .handle_request(region_id, request)
720            .await
721            .map(RegionResponse::new)
722            .map_err(BoxedError::new)?;
723
724        if is_alter {
725            self.handle_alter_response(region_id, &mut response)
726                .map_err(BoxedError::new)?;
727        } else if is_create {
728            self.handle_create_response(region_id, &mut response)
729                .map_err(BoxedError::new)?;
730        }
731
732        Ok(response)
733    }
734
735    #[tracing::instrument(skip_all)]
736    async fn handle_query(
737        &self,
738        region_id: RegionId,
739        request: ScanRequest,
740    ) -> Result<RegionScannerRef, BoxedError> {
741        self.scan_region(region_id, request)
742            .map_err(BoxedError::new)?
743            .region_scanner()
744            .await
745            .map_err(BoxedError::new)
746    }
747
748    async fn get_last_seq_num(
749        &self,
750        region_id: RegionId,
751    ) -> Result<Option<SequenceNumber>, BoxedError> {
752        self.inner
753            .get_last_seq_num(region_id)
754            .map_err(BoxedError::new)
755    }
756
757    /// Retrieve region's metadata.
758    async fn get_metadata(
759        &self,
760        region_id: RegionId,
761    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
762        self.inner.get_metadata(region_id).map_err(BoxedError::new)
763    }
764
765    /// Stop the engine.
766    ///
767    /// Stopping the engine doesn't stop the underlying log store as other components might
768    /// still use it. (When no other components are referencing the log store, it will
769    /// automatically shutdown.)
770    async fn stop(&self) -> std::result::Result<(), BoxedError> {
771        self.inner.stop().await.map_err(BoxedError::new)
772    }
773
774    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
775        self.get_region_statistic(region_id)
776    }
777
778    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
779        self.inner
780            .set_region_role(region_id, role)
781            .map_err(BoxedError::new)
782    }
783
784    async fn set_region_role_state_gracefully(
785        &self,
786        region_id: RegionId,
787        region_role_state: SettableRegionRoleState,
788    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
789        let _timer = HANDLE_REQUEST_ELAPSED
790            .with_label_values(&["set_region_role_state_gracefully"])
791            .start_timer();
792
793        self.inner
794            .set_region_role_state_gracefully(region_id, region_role_state)
795            .await
796            .map_err(BoxedError::new)
797    }
798
799    async fn sync_region(
800        &self,
801        region_id: RegionId,
802        manifest_info: RegionManifestInfo,
803    ) -> Result<SyncManifestResponse, BoxedError> {
804        let (_, synced) = self
805            .inner
806            .sync_region(region_id, manifest_info)
807            .await
808            .map_err(BoxedError::new)?;
809
810        Ok(SyncManifestResponse::Mito { synced })
811    }
812
813    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
814        self.inner.role(region_id)
815    }
816
817    fn as_any(&self) -> &dyn Any {
818        self
819    }
820}
821
822impl MitoEngine {
823    fn handle_alter_response(
824        &self,
825        region_id: RegionId,
826        response: &mut RegionResponse,
827    ) -> Result<()> {
828        if let Some(statistic) = self.region_statistic(region_id) {
829            Self::encode_manifest_info_to_extensions(
830                &region_id,
831                statistic.manifest,
832                &mut response.extensions,
833            )?;
834        }
835        let column_metadatas = self
836            .inner
837            .find_region(region_id)
838            .ok()
839            .map(|r| r.metadata().column_metadatas.clone());
840        if let Some(column_metadatas) = column_metadatas {
841            Self::encode_column_metadatas_to_extensions(
842                &region_id,
843                column_metadatas,
844                &mut response.extensions,
845            )?;
846        }
847        Ok(())
848    }
849
850    fn handle_create_response(
851        &self,
852        region_id: RegionId,
853        response: &mut RegionResponse,
854    ) -> Result<()> {
855        let column_metadatas = self
856            .inner
857            .find_region(region_id)
858            .ok()
859            .map(|r| r.metadata().column_metadatas.clone());
860        if let Some(column_metadatas) = column_metadatas {
861            Self::encode_column_metadatas_to_extensions(
862                &region_id,
863                column_metadatas,
864                &mut response.extensions,
865            )?;
866        }
867        Ok(())
868    }
869}
870
871// Tests methods.
872#[cfg(any(test, feature = "test"))]
873#[allow(clippy::too_many_arguments)]
874impl MitoEngine {
875    /// Returns a new [MitoEngine] for tests.
876    pub async fn new_for_test<S: LogStore>(
877        data_home: &str,
878        mut config: MitoConfig,
879        log_store: Arc<S>,
880        object_store_manager: ObjectStoreManagerRef,
881        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
882        listener: Option<crate::engine::listener::EventListenerRef>,
883        time_provider: crate::time_provider::TimeProviderRef,
884        schema_metadata_manager: SchemaMetadataManagerRef,
885    ) -> Result<MitoEngine> {
886        config.sanitize(data_home)?;
887
888        let config = Arc::new(config);
889        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
890        Ok(MitoEngine {
891            inner: Arc::new(EngineInner {
892                workers: WorkerGroup::start_for_test(
893                    config.clone(),
894                    log_store,
895                    object_store_manager,
896                    write_buffer_manager,
897                    listener,
898                    schema_metadata_manager,
899                    time_provider,
900                )
901                .await?,
902                config,
903                wal_raw_entry_reader,
904                #[cfg(feature = "enterprise")]
905                extension_range_provider_factory: None,
906            }),
907        })
908    }
909
910    /// Returns the purge scheduler.
911    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
912        self.inner.workers.purge_scheduler()
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use std::time::Duration;
919
920    use super::*;
921    use crate::sst::file::FileMeta;
922
923    #[test]
924    fn test_is_valid_region_edit() {
925        // Valid: has only "files_to_add"
926        let edit = RegionEdit {
927            files_to_add: vec![FileMeta::default()],
928            files_to_remove: vec![],
929            compaction_time_window: None,
930            flushed_entry_id: None,
931            flushed_sequence: None,
932        };
933        assert!(is_valid_region_edit(&edit));
934
935        // Invalid: "files_to_add" is empty
936        let edit = RegionEdit {
937            files_to_add: vec![],
938            files_to_remove: vec![],
939            compaction_time_window: None,
940            flushed_entry_id: None,
941            flushed_sequence: None,
942        };
943        assert!(!is_valid_region_edit(&edit));
944
945        // Invalid: "files_to_remove" is not empty
946        let edit = RegionEdit {
947            files_to_add: vec![FileMeta::default()],
948            files_to_remove: vec![FileMeta::default()],
949            compaction_time_window: None,
950            flushed_entry_id: None,
951            flushed_sequence: None,
952        };
953        assert!(!is_valid_region_edit(&edit));
954
955        // Invalid: other fields are not all "None"s
956        let edit = RegionEdit {
957            files_to_add: vec![FileMeta::default()],
958            files_to_remove: vec![],
959            compaction_time_window: Some(Duration::from_secs(1)),
960            flushed_entry_id: None,
961            flushed_sequence: None,
962        };
963        assert!(!is_valid_region_edit(&edit));
964        let edit = RegionEdit {
965            files_to_add: vec![FileMeta::default()],
966            files_to_remove: vec![],
967            compaction_time_window: None,
968            flushed_entry_id: Some(1),
969            flushed_sequence: None,
970        };
971        assert!(!is_valid_region_edit(&edit));
972        let edit = RegionEdit {
973            files_to_add: vec![FileMeta::default()],
974            files_to_remove: vec![],
975            compaction_time_window: None,
976            flushed_entry_id: None,
977            flushed_sequence: Some(1),
978        };
979        assert!(!is_valid_region_edit(&edit));
980    }
981}