Skip to main content

mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59pub mod region_hook;
60#[cfg(test)]
61mod row_selector_test;
62#[cfg(test)]
63mod scan_corrupt;
64#[cfg(test)]
65mod scan_test;
66#[cfg(test)]
67mod set_role_state_test;
68#[cfg(test)]
69mod skip_wal_test;
70#[cfg(test)]
71mod staging_test;
72#[cfg(test)]
73mod sync_test;
74#[cfg(test)]
75mod truncate_test;
76
77#[cfg(test)]
78mod copy_region_from_test;
79#[cfg(test)]
80mod remap_manifests_test;
81
82#[cfg(test)]
83mod apply_staging_manifest_test;
84#[cfg(test)]
85mod partition_filter_test;
86mod puffin_index;
87
88use std::any::Any;
89use std::collections::{HashMap, HashSet};
90use std::sync::Arc;
91use std::time::Instant;
92
93use api::region::RegionResponse;
94use async_trait::async_trait;
95use common_base::Plugins;
96use common_error::ext::BoxedError;
97use common_meta::error::UnexpectedSnafu;
98use common_meta::key::SchemaMetadataManagerRef;
99use common_recordbatch::{QueryMemoryTracker, SendableRecordBatchStream};
100use common_stat::get_total_memory_bytes;
101use common_telemetry::{info, tracing, warn};
102use common_wal::options::WalOptions;
103use futures::future::{join_all, try_join_all};
104use futures::stream::{self, Stream, StreamExt};
105use object_store::manager::ObjectStoreManagerRef;
106use snafu::{OptionExt, ResultExt, ensure};
107use store_api::ManifestVersion;
108use store_api::codec::PrimaryKeyEncoding;
109use store_api::logstore::LogStore;
110use store_api::logstore::provider::{KafkaProvider, Provider};
111use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
112use store_api::metric_engine_consts::{
113    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
114};
115use store_api::region_engine::{
116    BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
117    RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
118    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
119    SyncRegionFromRequest, SyncRegionFromResponse,
120};
121use store_api::region_info::RegionInfoEntry;
122use store_api::region_request::{
123    AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
124};
125use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
126use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
127use tokio::sync::{Semaphore, oneshot};
128
129use crate::access_layer::RegionFilePathFactory;
130use crate::cache::{CacheManagerRef, CacheStrategy};
131use crate::config::MitoConfig;
132use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
133use crate::error::{
134    IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu,
135    RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu,
136    SnapshotFenceStaleSnafu,
137};
138#[cfg(feature = "enterprise")]
139use crate::extension::BoxedExtensionRangeProviderFactory;
140use crate::gc::GcLimiterRef;
141use crate::manifest::action::RegionEdit;
142use crate::memtable::MemtableStats;
143use crate::metrics::{
144    HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_EXHAUSTED_TOTAL, SCAN_MEMORY_USAGE_BYTES,
145    SCAN_REQUESTS_REJECTED_TOTAL,
146};
147use crate::read::scan_region::{ScanRegion, Scanner};
148use crate::read::stream::ScanBatchStream;
149use crate::region::MitoRegionRef;
150use crate::region::opener::PartitionExprFetcherRef;
151use crate::region::options::parse_wal_options;
152use crate::request::{RegionEditRequest, WorkerRequest};
153use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
154use crate::sst::file_ref::FileReferenceManagerRef;
155use crate::sst::index::intermediate::IntermediateManager;
156use crate::sst::index::puffin_manager::PuffinManagerFactory;
157use crate::wal::entry_distributor::{
158    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
159};
160use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
161use crate::worker::WorkerGroup;
162
163pub const MITO_ENGINE_NAME: &str = "mito";
164
165pub struct MitoEngineBuilder<'a, S: LogStore> {
166    data_home: &'a str,
167    config: MitoConfig,
168    log_store: Arc<S>,
169    object_store_manager: ObjectStoreManagerRef,
170    schema_metadata_manager: SchemaMetadataManagerRef,
171    file_ref_manager: FileReferenceManagerRef,
172    partition_expr_fetcher: PartitionExprFetcherRef,
173    plugins: Plugins,
174    #[cfg(feature = "enterprise")]
175    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
176}
177
178impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
179    #[allow(clippy::too_many_arguments)]
180    pub fn new(
181        data_home: &'a str,
182        config: MitoConfig,
183        log_store: Arc<S>,
184        object_store_manager: ObjectStoreManagerRef,
185        schema_metadata_manager: SchemaMetadataManagerRef,
186        file_ref_manager: FileReferenceManagerRef,
187        partition_expr_fetcher: PartitionExprFetcherRef,
188        plugins: Plugins,
189    ) -> Self {
190        Self {
191            data_home,
192            config,
193            log_store,
194            object_store_manager,
195            schema_metadata_manager,
196            file_ref_manager,
197            plugins,
198            partition_expr_fetcher,
199            #[cfg(feature = "enterprise")]
200            extension_range_provider_factory: None,
201        }
202    }
203
204    #[cfg(feature = "enterprise")]
205    #[must_use]
206    pub fn with_extension_range_provider_factory(
207        self,
208        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
209    ) -> Self {
210        Self {
211            extension_range_provider_factory,
212            ..self
213        }
214    }
215
216    pub async fn try_build(mut self) -> Result<MitoEngine> {
217        self.config.sanitize(self.data_home)?;
218
219        let config = Arc::new(self.config);
220        let workers = WorkerGroup::start(
221            config.clone(),
222            self.log_store.clone(),
223            self.object_store_manager,
224            self.schema_metadata_manager,
225            self.file_ref_manager,
226            self.partition_expr_fetcher.clone(),
227            self.plugins,
228        )
229        .await?;
230        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
231        let total_memory = get_total_memory_bytes().max(0) as u64;
232        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
233        let scan_memory_tracker =
234            QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
235                .on_update(|usage| {
236                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
237                })
238                .on_exhausted(|| {
239                    SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
240                })
241                .on_reject(|| {
242                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
243                })
244                .build();
245
246        let inner = EngineInner {
247            workers,
248            config,
249            wal_raw_entry_reader,
250            scan_memory_tracker,
251            #[cfg(feature = "enterprise")]
252            extension_range_provider_factory: None,
253        };
254
255        #[cfg(feature = "enterprise")]
256        let inner =
257            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
258
259        Ok(MitoEngine {
260            inner: Arc::new(inner),
261        })
262    }
263}
264
265/// Region engine implementation for timeseries data.
266#[derive(Clone)]
267pub struct MitoEngine {
268    inner: Arc<EngineInner>,
269}
270
271impl MitoEngine {
272    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
273    #[allow(clippy::too_many_arguments)]
274    pub async fn new<S: LogStore>(
275        data_home: &str,
276        config: MitoConfig,
277        log_store: Arc<S>,
278        object_store_manager: ObjectStoreManagerRef,
279        schema_metadata_manager: SchemaMetadataManagerRef,
280        file_ref_manager: FileReferenceManagerRef,
281        partition_expr_fetcher: PartitionExprFetcherRef,
282        plugins: Plugins,
283    ) -> Result<MitoEngine> {
284        let builder = MitoEngineBuilder::new(
285            data_home,
286            config,
287            log_store,
288            object_store_manager,
289            schema_metadata_manager,
290            file_ref_manager,
291            partition_expr_fetcher,
292            plugins,
293        );
294        builder.try_build().await
295    }
296
297    pub fn mito_config(&self) -> &MitoConfig {
298        &self.inner.config
299    }
300
301    pub fn cache_manager(&self) -> CacheManagerRef {
302        self.inner.workers.cache_manager()
303    }
304
305    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
306        self.inner.workers.file_ref_manager()
307    }
308
309    pub fn gc_limiter(&self) -> GcLimiterRef {
310        self.inner.workers.gc_limiter()
311    }
312
313    pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
314        self.inner.workers.object_store_manager()
315    }
316
317    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
318        self.inner.workers.puffin_manager_factory()
319    }
320
321    pub fn intermediate_manager(&self) -> &IntermediateManager {
322        self.inner.workers.intermediate_manager()
323    }
324
325    pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
326        self.inner.workers.schema_metadata_manager()
327    }
328
329    /// Get all tmp ref files for given region ids, excluding files that's already in manifest.
330    pub async fn get_snapshot_of_file_refs(
331        &self,
332        file_handle_regions: impl IntoIterator<Item = RegionId>,
333        related_regions: HashMap<RegionId, HashSet<RegionId>>,
334    ) -> Result<FileRefsManifest> {
335        let file_ref_mgr = self.file_ref_manager();
336
337        let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
338        // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
339        // as regions on other datanodes are not managed by this engine.
340        let query_regions: Vec<MitoRegionRef> = file_handle_regions
341            .into_iter()
342            .filter_map(|region_id| self.find_region(region_id))
343            .collect();
344
345        let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
346            let dst2src = related_regions
347                .into_iter()
348                .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
349                .fold(
350                    HashMap::<RegionId, HashSet<RegionId>>::new(),
351                    |mut acc, (k, v)| {
352                        let entry = acc.entry(k).or_default();
353                        entry.insert(v);
354                        acc
355                    },
356                );
357            let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
358            for (dst_region, srcs) in dst2src {
359                let Some(dst_region) = self.find_region(dst_region) else {
360                    continue;
361                };
362                dst_region_to_src_regions.push((dst_region, srcs));
363            }
364            dst_region_to_src_regions
365        };
366
367        file_ref_mgr
368            .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
369            .await
370    }
371
372    /// Returns true if the specific region exists.
373    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
374        self.inner.workers.is_region_exists(region_id)
375    }
376
377    /// Returns true if the specific region exists.
378    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
379        self.inner.workers.is_region_opening(region_id)
380    }
381
382    /// Returns true if the specific region is catching up.
383    pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
384        self.inner.workers.is_region_catching_up(region_id)
385    }
386
387    /// Returns the region disk/memory statistic.
388    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
389        self.find_region(region_id)
390            .map(|region| region.region_statistic())
391    }
392
393    /// Returns primary key encoding of the region.
394    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
395        self.find_region(region_id)
396            .map(|r| r.primary_key_encoding())
397    }
398
399    /// Handle substrait query and return a stream of record batches
400    ///
401    /// Notice that the output stream's ordering is not guranateed. If order
402    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
403    #[tracing::instrument(skip_all)]
404    pub async fn scan_to_stream(
405        &self,
406        region_id: RegionId,
407        request: ScanRequest,
408    ) -> Result<SendableRecordBatchStream, BoxedError> {
409        self.scanner(region_id, request)
410            .await
411            .map_err(BoxedError::new)?
412            .scan()
413            .await
414    }
415
416    /// Scan [`Batch`]es by [`ScanRequest`].
417    pub async fn scan_batch(
418        &self,
419        region_id: RegionId,
420        request: ScanRequest,
421        filter_deleted: bool,
422    ) -> Result<ScanBatchStream> {
423        let mut scan_region = self.scan_region(region_id, request)?;
424        scan_region.set_filter_deleted(filter_deleted);
425        scan_region.scanner().await?.scan_batch()
426    }
427
428    /// Returns a scanner to scan for `request`.
429    pub(crate) async fn scanner(
430        &self,
431        region_id: RegionId,
432        request: ScanRequest,
433    ) -> Result<Scanner> {
434        self.scan_region(region_id, request)?.scanner().await
435    }
436
437    /// Scans a region.
438    #[tracing::instrument(skip_all, fields(region_id = %region_id))]
439    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
440        self.inner.scan_region(region_id, request)
441    }
442
443    /// Edit region's metadata by [RegionEdit] directly. Use with care.
444    /// Now we only allow adding files or removing files from region (the [RegionEdit] struct can only contain a non-empty "files_to_add" or "files_to_remove" field).
445    /// Other region editing intention will result in an "invalid request" error.
446    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
447    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
448        let _timer = HANDLE_REQUEST_ELAPSED
449            .with_label_values(&["edit_region"])
450            .start_timer();
451
452        ensure!(
453            is_valid_region_edit(&edit),
454            InvalidRequestSnafu {
455                region_id,
456                reason: "invalid region edit"
457            }
458        );
459
460        let (tx, rx) = oneshot::channel();
461        let request = WorkerRequest::EditRegion(RegionEditRequest::new(region_id, edit, true, tx));
462        self.inner
463            .workers
464            .submit_to_worker(region_id, request)
465            .await?;
466        rx.await.context(RecvSnafu)?
467    }
468
469    /// Handles copy region from request.
470    ///
471    /// This method is only supported for internal use and is not exposed in the trait implementation.
472    pub async fn copy_region_from(
473        &self,
474        region_id: RegionId,
475        request: MitoCopyRegionFromRequest,
476    ) -> Result<MitoCopyRegionFromResponse> {
477        self.inner.copy_region_from(region_id, request).await
478    }
479
480    #[cfg(test)]
481    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
482        self.find_region(id)
483    }
484
485    pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
486        self.inner.workers.get_region(region_id)
487    }
488
489    /// Returns all regions.
490    pub fn regions(&self) -> Vec<MitoRegionRef> {
491        self.inner.workers.all_regions().collect()
492    }
493
494    fn encode_manifest_info_to_extensions(
495        region_id: &RegionId,
496        manifest_info: RegionManifestInfo,
497        extensions: &mut HashMap<String, Vec<u8>>,
498    ) -> Result<()> {
499        let region_manifest_info = vec![(*region_id, manifest_info)];
500
501        extensions.insert(
502            MANIFEST_INFO_EXTENSION_KEY.to_string(),
503            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
504        );
505        info!(
506            "Added manifest info: {:?} to extensions, region_id: {:?}",
507            region_manifest_info, region_id
508        );
509        Ok(())
510    }
511
512    fn encode_column_metadatas_to_extensions(
513        region_id: &RegionId,
514        column_metadatas: Vec<ColumnMetadata>,
515        extensions: &mut HashMap<String, Vec<u8>>,
516    ) -> Result<()> {
517        extensions.insert(
518            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
519            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
520        );
521        info!(
522            "Added column metadatas: {:?} to extensions, region_id: {:?}",
523            column_metadatas, region_id
524        );
525        Ok(())
526    }
527
528    /// Find the current version's memtables and SSTs stats by region_id.
529    /// The stats must be collected in one place one time to ensure data consistency.
530    pub fn find_memtable_and_sst_stats(
531        &self,
532        region_id: RegionId,
533    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
534        let region = self
535            .find_region(region_id)
536            .context(RegionNotFoundSnafu { region_id })?;
537
538        let version = region.version();
539        let memtable_stats = version
540            .memtables
541            .list_memtables()
542            .iter()
543            .map(|x| x.stats())
544            .collect::<Vec<_>>();
545
546        let sst_stats = version
547            .ssts
548            .levels()
549            .iter()
550            .flat_map(|level| level.files().map(|x| x.meta_ref()))
551            .cloned()
552            .collect::<Vec<_>>();
553        Ok((memtable_stats, sst_stats))
554    }
555
556    /// Lists all SSTs from the manifest of all regions in the engine.
557    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
558        let node_id = self.inner.workers.file_ref_manager().node_id();
559        let regions = self.inner.workers.all_regions();
560
561        let mut results = Vec::new();
562        for region in regions {
563            let mut entries = region.manifest_sst_entries().await;
564            for e in &mut entries {
565                e.node_id = node_id;
566            }
567            results.extend(entries);
568        }
569
570        results
571    }
572
573    /// Lists metadata about all puffin index targets stored in the engine.
574    pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
575        let node_id = self.inner.workers.file_ref_manager().node_id();
576        let cache_manager = self.inner.workers.cache_manager();
577        let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
578        let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
579        let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
580
581        let mut results = Vec::new();
582
583        for region in self.inner.workers.all_regions() {
584            let manifest_entries = region.manifest_sst_entries().await;
585            let access_layer = region.access_layer.clone();
586            let table_dir = access_layer.table_dir().to_string();
587            let path_type = access_layer.path_type();
588            let object_store = access_layer.object_store().clone();
589            let puffin_factory = access_layer.puffin_manager_factory().clone();
590            let path_factory = RegionFilePathFactory::new(table_dir, path_type);
591
592            let entry_futures = manifest_entries.into_iter().map(|entry| {
593                let object_store = object_store.clone();
594                let path_factory = path_factory.clone();
595                let puffin_factory = puffin_factory.clone();
596                let puffin_metadata_cache = puffin_metadata_cache.clone();
597                let bloom_filter_cache = bloom_filter_cache.clone();
598                let inverted_index_cache = inverted_index_cache.clone();
599
600                async move {
601                    let Some(index_file_path) = entry.index_file_path.as_ref() else {
602                        return Vec::new();
603                    };
604
605                    let index_version = entry.index_version;
606                    let file_id = match FileId::parse_str(&entry.file_id) {
607                        Ok(file_id) => file_id,
608                        Err(err) => {
609                            warn!(
610                                err;
611                                "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
612                                entry.table_dir,
613                                entry.file_id
614                            );
615                            return Vec::new();
616                        }
617                    };
618                    // The index file path is derived from the physical file owner. After
619                    // repartition, `entry.region_id` is only the referring region.
620                    let region_index_id = RegionIndexId::new(
621                        RegionFileId::new(entry.origin_region_id, file_id),
622                        index_version,
623                    );
624                    let context = IndexEntryContext {
625                        table_dir: &entry.table_dir,
626                        index_file_path: index_file_path.as_str(),
627                        region_id: entry.region_id,
628                        table_id: entry.table_id,
629                        region_number: entry.region_number,
630                        region_group: entry.region_group,
631                        region_sequence: entry.region_sequence,
632                        file_id: &entry.file_id,
633                        index_file_size: entry.index_file_size,
634                        node_id,
635                    };
636
637                    let manager = puffin_factory
638                        .build(object_store, path_factory)
639                        .with_puffin_metadata_cache(puffin_metadata_cache);
640
641                    collect_index_entries_from_puffin(
642                        manager,
643                        region_index_id,
644                        context,
645                        bloom_filter_cache,
646                        inverted_index_cache,
647                    )
648                    .await
649                }
650            });
651
652            let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
653            while let Some(mut metas) = meta_stream.next().await {
654                results.append(&mut metas);
655            }
656        }
657
658        results
659    }
660
661    /// Lists region info entries of all regions in the engine.
662    pub async fn all_region_infos(&self) -> Vec<RegionInfoEntry> {
663        let node_id = self.inner.workers.file_ref_manager().node_id();
664        self.inner
665            .workers
666            .all_regions()
667            .map(|region| region.region_info_entry(node_id))
668            .collect()
669    }
670
671    /// Lists all SSTs from the storage layer of all regions in the engine.
672    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
673        let node_id = self.inner.workers.file_ref_manager().node_id();
674        let regions = self.inner.workers.all_regions();
675
676        let mut layers_distinct_table_dirs = HashMap::new();
677        for region in regions {
678            let table_dir = region.access_layer.table_dir();
679            if !layers_distinct_table_dirs.contains_key(table_dir) {
680                layers_distinct_table_dirs
681                    .insert(table_dir.to_string(), region.access_layer.clone());
682            }
683        }
684
685        stream::iter(layers_distinct_table_dirs)
686            .map(|(_, access_layer)| access_layer.storage_sst_entries())
687            .flatten()
688            .map(move |entry| {
689                entry.map(move |mut entry| {
690                    entry.node_id = node_id;
691                    entry
692                })
693            })
694    }
695}
696
697/// Check whether the region edit is valid.
698///
699/// Only adding or removing files to region is considered valid now.
700fn is_valid_region_edit(edit: &RegionEdit) -> bool {
701    (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
702        && matches!(
703            edit,
704            RegionEdit {
705                files_to_add: _,
706                files_to_remove: _,
707                timestamp_ms: _,
708                compaction_time_window: None,
709                flushed_entry_id: None,
710                flushed_sequence: None,
711                ..
712            }
713        )
714}
715
716/// Inner struct of [MitoEngine].
717struct EngineInner {
718    /// Region workers group.
719    workers: WorkerGroup,
720    /// Config of the engine.
721    config: Arc<MitoConfig>,
722    /// The Wal raw entry reader.
723    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
724    /// Memory tracker for table scans.
725    scan_memory_tracker: QueryMemoryTracker,
726    #[cfg(feature = "enterprise")]
727    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
728}
729
730type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
731
732/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
733fn prepare_batch_open_requests(
734    requests: Vec<(RegionId, RegionOpenRequest)>,
735) -> Result<(
736    TopicGroupedRegionOpenRequests,
737    Vec<(RegionId, RegionOpenRequest)>,
738)> {
739    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
740    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
741    for (region_id, request) in requests {
742        match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
743            WalOptions::Kafka(options) => {
744                topic_to_regions
745                    .entry(options.topic)
746                    .or_default()
747                    .push((region_id, request));
748            }
749            WalOptions::RaftEngine | WalOptions::Noop => {
750                remaining_regions.push((region_id, request));
751            }
752        }
753    }
754
755    Ok((topic_to_regions, remaining_regions))
756}
757
758impl EngineInner {
759    #[cfg(feature = "enterprise")]
760    #[must_use]
761    fn with_extension_range_provider_factory(
762        self,
763        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
764    ) -> Self {
765        Self {
766            extension_range_provider_factory,
767            ..self
768        }
769    }
770
771    /// Stop the inner engine.
772    async fn stop(&self) -> Result<()> {
773        self.workers.stop().await
774    }
775
776    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
777        self.workers
778            .get_region(region_id)
779            .context(RegionNotFoundSnafu { region_id })
780    }
781
782    /// Get metadata of a region.
783    ///
784    /// Returns error if the region doesn't exist.
785    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
786        // Reading a region doesn't need to go through the region worker thread.
787        let region = self.find_region(region_id)?;
788        Ok(region.metadata())
789    }
790
791    async fn open_topic_regions(
792        &self,
793        topic: String,
794        region_requests: Vec<(RegionId, RegionOpenRequest)>,
795    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
796        let now = Instant::now();
797        let region_ids = region_requests
798            .iter()
799            .map(|(region_id, _)| *region_id)
800            .collect::<Vec<_>>();
801        let provider = Provider::kafka_provider(topic);
802        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
803            provider.clone(),
804            self.wal_raw_entry_reader.clone(),
805            &region_ids,
806            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
807        );
808
809        let mut responses = Vec::with_capacity(region_requests.len());
810        for ((region_id, request), entry_receiver) in
811            region_requests.into_iter().zip(entry_receivers)
812        {
813            let (request, receiver) =
814                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
815            self.workers.submit_to_worker(region_id, request).await?;
816            responses.push(async move { receiver.await.context(RecvSnafu)? });
817        }
818
819        // Waits for entries distribution.
820        let distribution =
821            common_runtime::spawn_global(async move { distributor.distribute().await });
822        // Waits for worker returns.
823        let responses = join_all(responses).await;
824        distribution.await.context(JoinSnafu)??;
825
826        let num_failure = responses.iter().filter(|r| r.is_err()).count();
827        info!(
828            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
829            region_ids.len() - num_failure,
830            // Safety: provider is kafka provider.
831            provider.as_kafka_provider().unwrap(),
832            num_failure,
833            now.elapsed(),
834        );
835        Ok(region_ids.into_iter().zip(responses).collect())
836    }
837
838    async fn handle_batch_open_requests(
839        &self,
840        parallelism: usize,
841        requests: Vec<(RegionId, RegionOpenRequest)>,
842    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
843        let semaphore = Arc::new(Semaphore::new(parallelism));
844        let (topic_to_region_requests, remaining_region_requests) =
845            prepare_batch_open_requests(requests)?;
846        let mut responses =
847            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
848
849        if !topic_to_region_requests.is_empty() {
850            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
851            for (topic, region_requests) in topic_to_region_requests {
852                let semaphore_moved = semaphore.clone();
853                tasks.push(async move {
854                    // Safety: semaphore must exist
855                    let _permit = semaphore_moved.acquire().await.unwrap();
856                    self.open_topic_regions(topic, region_requests).await
857                })
858            }
859            let r = try_join_all(tasks).await?;
860            responses.extend(r.into_iter().flatten());
861        }
862
863        if !remaining_region_requests.is_empty() {
864            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
865            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
866            for (region_id, request) in remaining_region_requests {
867                let semaphore_moved = semaphore.clone();
868                region_ids.push(region_id);
869                tasks.push(async move {
870                    // Safety: semaphore must exist
871                    let _permit = semaphore_moved.acquire().await.unwrap();
872                    let (request, receiver) =
873                        WorkerRequest::new_open_region_request(region_id, request, None);
874
875                    self.workers.submit_to_worker(region_id, request).await?;
876
877                    receiver.await.context(RecvSnafu)?
878                })
879            }
880
881            let results = join_all(tasks).await;
882            responses.extend(region_ids.into_iter().zip(results));
883        }
884
885        Ok(responses)
886    }
887
888    async fn catchup_topic_regions(
889        &self,
890        provider: Provider,
891        region_requests: Vec<(RegionId, RegionCatchupRequest)>,
892    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
893        let now = Instant::now();
894        let region_ids = region_requests
895            .iter()
896            .map(|(region_id, _)| *region_id)
897            .collect::<Vec<_>>();
898        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
899            provider.clone(),
900            self.wal_raw_entry_reader.clone(),
901            &region_ids,
902            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
903        );
904
905        let mut responses = Vec::with_capacity(region_requests.len());
906        for ((region_id, request), entry_receiver) in
907            region_requests.into_iter().zip(entry_receivers)
908        {
909            let (request, receiver) =
910                WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
911            self.workers.submit_to_worker(region_id, request).await?;
912            responses.push(async move { receiver.await.context(RecvSnafu)? });
913        }
914
915        // Wait for entries distribution.
916        let distribution =
917            common_runtime::spawn_global(async move { distributor.distribute().await });
918        // Wait for worker returns.
919        let responses = join_all(responses).await;
920        distribution.await.context(JoinSnafu)??;
921
922        let num_failure = responses.iter().filter(|r| r.is_err()).count();
923        info!(
924            "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
925            region_ids.len() - num_failure,
926            // Safety: provider is kafka provider.
927            provider.as_kafka_provider().unwrap(),
928            num_failure,
929            now.elapsed(),
930        );
931
932        Ok(region_ids.into_iter().zip(responses).collect())
933    }
934
935    async fn handle_batch_catchup_requests(
936        &self,
937        parallelism: usize,
938        requests: Vec<(RegionId, RegionCatchupRequest)>,
939    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
940        let mut responses = Vec::with_capacity(requests.len());
941        let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
942        let mut remaining_region_requests = vec![];
943
944        for (region_id, request) in requests {
945            match self.workers.get_region(region_id) {
946                Some(region) => match region.provider.as_kafka_provider() {
947                    Some(provider) => {
948                        topic_regions
949                            .entry(provider.clone())
950                            .or_default()
951                            .push((region_id, request));
952                    }
953                    None => {
954                        remaining_region_requests.push((region_id, request));
955                    }
956                },
957                None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
958            }
959        }
960
961        let semaphore = Arc::new(Semaphore::new(parallelism));
962
963        if !topic_regions.is_empty() {
964            let mut tasks = Vec::with_capacity(topic_regions.len());
965            for (provider, region_requests) in topic_regions {
966                let semaphore_moved = semaphore.clone();
967                tasks.push(async move {
968                    // Safety: semaphore must exist
969                    let _permit = semaphore_moved.acquire().await.unwrap();
970                    self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
971                        .await
972                })
973            }
974
975            let r = try_join_all(tasks).await?;
976            responses.extend(r.into_iter().flatten());
977        }
978
979        if !remaining_region_requests.is_empty() {
980            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
981            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
982            for (region_id, request) in remaining_region_requests {
983                let semaphore_moved = semaphore.clone();
984                region_ids.push(region_id);
985                tasks.push(async move {
986                    // Safety: semaphore must exist
987                    let _permit = semaphore_moved.acquire().await.unwrap();
988                    let (request, receiver) =
989                        WorkerRequest::new_catchup_region_request(region_id, request, None);
990
991                    self.workers.submit_to_worker(region_id, request).await?;
992
993                    receiver.await.context(RecvSnafu)?
994                })
995            }
996
997            let results = join_all(tasks).await;
998            responses.extend(region_ids.into_iter().zip(results));
999        }
1000
1001        Ok(responses)
1002    }
1003
1004    /// Handles [RegionRequest] and return its executed result.
1005    async fn handle_request(
1006        &self,
1007        region_id: RegionId,
1008        request: RegionRequest,
1009    ) -> Result<AffectedRows> {
1010        let region_metadata = self.get_metadata(region_id).ok();
1011        let (request, receiver) =
1012            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
1013        self.workers.submit_to_worker(region_id, request).await?;
1014
1015        receiver.await.context(RecvSnafu)?
1016    }
1017
1018    /// Returns the sequence of latest committed data.
1019    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
1020        // Reading a region doesn't need to go through the region worker thread.
1021        self.find_region(region_id)
1022            .map(|r| r.find_committed_sequence())
1023    }
1024
1025    /// Handles the scan `request` and returns a [ScanRegion].
1026    #[tracing::instrument(skip_all, fields(region_id = %region_id))]
1027    fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result<ScanRegion> {
1028        let query_start = Instant::now();
1029        // Reading a region doesn't need to go through the region worker thread.
1030        let region = self.find_region(region_id)?;
1031        let version_data = region.version_control.current();
1032        let version = version_data.version;
1033
1034        if request.snapshot_on_scan && request.memtable_max_sequence.is_none() {
1035            request.memtable_max_sequence = Some(version_data.committed_sequence);
1036        }
1037
1038        if let Some(given_seq) = request.memtable_min_sequence {
1039            let min_readable_seq = version.flushed_sequence;
1040            ensure!(
1041                given_seq >= min_readable_seq,
1042                IncrementalQueryStaleSnafu {
1043                    region_id,
1044                    given_seq,
1045                    min_readable_seq,
1046                }
1047            );
1048        }
1049
1050        if let Some(given_seq) = request.memtable_max_sequence
1051            && !request.skip_sst_files
1052        {
1053            // Explicit snapshot fences that include SST reads are enforceable
1054            // only while the requested upper bound is not older than the
1055            // region's flushed frontier. If H has already been flushed into SST,
1056            // mito cannot apply a memtable-only sequence upper bound to that
1057            // SST scan, so fail and let Flow rebind the fenced repair instead
1058            // of reading rows beyond H.
1059            let min_enforceable_seq = version.flushed_sequence;
1060            ensure!(
1061                given_seq >= min_enforceable_seq,
1062                SnapshotFenceStaleSnafu {
1063                    region_id,
1064                    given_seq,
1065                    min_enforceable_seq,
1066                }
1067            );
1068        }
1069
1070        // Get cache.
1071        let cache_manager = self.workers.cache_manager();
1072
1073        let scan_region = ScanRegion::new(
1074            version,
1075            region.access_layer.clone(),
1076            request,
1077            CacheStrategy::EnableAll(cache_manager),
1078        )
1079        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
1080        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
1081        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
1082        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
1083        .with_start_time(query_start);
1084
1085        #[cfg(feature = "enterprise")]
1086        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
1087
1088        Ok(scan_region)
1089    }
1090
1091    #[cfg(feature = "enterprise")]
1092    fn maybe_fill_extension_range_provider(
1093        &self,
1094        mut scan_region: ScanRegion,
1095        region: MitoRegionRef,
1096    ) -> ScanRegion {
1097        if region.is_follower()
1098            && let Some(factory) = self.extension_range_provider_factory.as_ref()
1099        {
1100            scan_region
1101                .set_extension_range_provider(factory.create_extension_range_provider(region));
1102        }
1103        scan_region
1104    }
1105
1106    /// Converts the [`RegionRole`].
1107    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1108        let region = self.find_region(region_id)?;
1109        region.set_role(role);
1110        Ok(())
1111    }
1112
1113    /// Sets read-only for a region and ensures no more writes in the region after it returns.
1114    async fn set_region_role_state_gracefully(
1115        &self,
1116        region_id: RegionId,
1117        region_role_state: SettableRegionRoleState,
1118    ) -> Result<SetRegionRoleStateResponse> {
1119        // Notes: It acquires the mutable ownership to ensure no other threads,
1120        // Therefore, we submit it to the worker.
1121        let (request, receiver) =
1122            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1123        self.workers.submit_to_worker(region_id, request).await?;
1124
1125        receiver.await.context(RecvSnafu)
1126    }
1127
1128    async fn sync_region(
1129        &self,
1130        region_id: RegionId,
1131        manifest_info: RegionManifestInfo,
1132    ) -> Result<(ManifestVersion, bool)> {
1133        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1134        let manifest_version = manifest_info.data_manifest_version();
1135        let (request, receiver) =
1136            WorkerRequest::new_sync_region_request(region_id, manifest_version);
1137        self.workers.submit_to_worker(region_id, request).await?;
1138
1139        receiver.await.context(RecvSnafu)?
1140    }
1141
1142    async fn remap_manifests(
1143        &self,
1144        request: RemapManifestsRequest,
1145    ) -> Result<RemapManifestsResponse> {
1146        let region_id = request.region_id;
1147        let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1148        self.workers.submit_to_worker(region_id, request).await?;
1149        let manifest_paths = receiver.await.context(RecvSnafu)??;
1150        Ok(RemapManifestsResponse { manifest_paths })
1151    }
1152
1153    async fn copy_region_from(
1154        &self,
1155        region_id: RegionId,
1156        request: MitoCopyRegionFromRequest,
1157    ) -> Result<MitoCopyRegionFromResponse> {
1158        let (request, receiver) =
1159            WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
1160        self.workers.submit_to_worker(region_id, request).await?;
1161        let response = receiver.await.context(RecvSnafu)??;
1162        Ok(response)
1163    }
1164
1165    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1166        self.workers
1167            .get_region(region_id)
1168            .map(|region| region.region_role())
1169    }
1170}
1171
1172fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
1173    responses
1174        .into_iter()
1175        .map(|(region_id, response)| {
1176            (
1177                region_id,
1178                response.map(RegionResponse::new).map_err(BoxedError::new),
1179            )
1180        })
1181        .collect()
1182}
1183
1184#[async_trait]
1185impl RegionEngine for MitoEngine {
1186    fn name(&self) -> &str {
1187        MITO_ENGINE_NAME
1188    }
1189
1190    #[tracing::instrument(skip_all)]
1191    async fn handle_batch_open_requests(
1192        &self,
1193        parallelism: usize,
1194        requests: Vec<(RegionId, RegionOpenRequest)>,
1195    ) -> Result<BatchResponses, BoxedError> {
1196        // TODO(weny): add metrics.
1197        self.inner
1198            .handle_batch_open_requests(parallelism, requests)
1199            .await
1200            .map(map_batch_responses)
1201            .map_err(BoxedError::new)
1202    }
1203
1204    #[tracing::instrument(skip_all)]
1205    async fn handle_batch_catchup_requests(
1206        &self,
1207        parallelism: usize,
1208        requests: Vec<(RegionId, RegionCatchupRequest)>,
1209    ) -> Result<BatchResponses, BoxedError> {
1210        self.inner
1211            .handle_batch_catchup_requests(parallelism, requests)
1212            .await
1213            .map(map_batch_responses)
1214            .map_err(BoxedError::new)
1215    }
1216
1217    #[tracing::instrument(skip_all)]
1218    async fn handle_request(
1219        &self,
1220        region_id: RegionId,
1221        request: RegionRequest,
1222    ) -> Result<RegionResponse, BoxedError> {
1223        let _timer = HANDLE_REQUEST_ELAPSED
1224            .with_label_values(&[request.request_type()])
1225            .start_timer();
1226
1227        let is_alter = matches!(request, RegionRequest::Alter(_));
1228        let is_create = matches!(request, RegionRequest::Create(_));
1229        let mut response = self
1230            .inner
1231            .handle_request(region_id, request)
1232            .await
1233            .map(RegionResponse::new)
1234            .map_err(BoxedError::new)?;
1235
1236        if is_alter {
1237            self.handle_alter_response(region_id, &mut response)
1238                .map_err(BoxedError::new)?;
1239        } else if is_create {
1240            self.handle_create_response(region_id, &mut response)
1241                .map_err(BoxedError::new)?;
1242        }
1243
1244        Ok(response)
1245    }
1246
1247    #[tracing::instrument(skip_all)]
1248    async fn handle_query(
1249        &self,
1250        region_id: RegionId,
1251        request: ScanRequest,
1252    ) -> Result<RegionScannerRef, BoxedError> {
1253        self.scan_region(region_id, request)
1254            .map_err(BoxedError::new)?
1255            .region_scanner()
1256            .await
1257            .map_err(BoxedError::new)
1258    }
1259
1260    fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
1261        Some(self.inner.scan_memory_tracker.clone())
1262    }
1263
1264    async fn get_committed_sequence(
1265        &self,
1266        region_id: RegionId,
1267    ) -> Result<SequenceNumber, BoxedError> {
1268        self.inner
1269            .get_committed_sequence(region_id)
1270            .map_err(BoxedError::new)
1271    }
1272
1273    /// Retrieve region's metadata.
1274    async fn get_metadata(
1275        &self,
1276        region_id: RegionId,
1277    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1278        self.inner.get_metadata(region_id).map_err(BoxedError::new)
1279    }
1280
1281    /// Stop the engine.
1282    ///
1283    /// Stopping the engine doesn't stop the underlying log store as other components might
1284    /// still use it. (When no other components are referencing the log store, it will
1285    /// automatically shutdown.)
1286    async fn stop(&self) -> std::result::Result<(), BoxedError> {
1287        self.inner.stop().await.map_err(BoxedError::new)
1288    }
1289
1290    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1291        self.get_region_statistic(region_id)
1292    }
1293
1294    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1295        self.inner
1296            .set_region_role(region_id, role)
1297            .map_err(BoxedError::new)
1298    }
1299
1300    async fn set_region_role_state_gracefully(
1301        &self,
1302        region_id: RegionId,
1303        region_role_state: SettableRegionRoleState,
1304    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1305        let _timer = HANDLE_REQUEST_ELAPSED
1306            .with_label_values(&["set_region_role_state_gracefully"])
1307            .start_timer();
1308
1309        self.inner
1310            .set_region_role_state_gracefully(region_id, region_role_state)
1311            .await
1312            .map_err(BoxedError::new)
1313    }
1314
1315    async fn sync_region(
1316        &self,
1317        region_id: RegionId,
1318        request: SyncRegionFromRequest,
1319    ) -> Result<SyncRegionFromResponse, BoxedError> {
1320        let manifest_info = request
1321            .into_region_manifest_info()
1322            .context(UnexpectedSnafu {
1323                err_msg: "Expected a manifest info request",
1324            })
1325            .map_err(BoxedError::new)?;
1326        let (_, synced) = self
1327            .inner
1328            .sync_region(region_id, manifest_info)
1329            .await
1330            .map_err(BoxedError::new)?;
1331
1332        Ok(SyncRegionFromResponse::Mito { synced })
1333    }
1334
1335    async fn remap_manifests(
1336        &self,
1337        request: RemapManifestsRequest,
1338    ) -> Result<RemapManifestsResponse, BoxedError> {
1339        self.inner
1340            .remap_manifests(request)
1341            .await
1342            .map_err(BoxedError::new)
1343    }
1344
1345    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1346        self.inner.role(region_id)
1347    }
1348
1349    fn as_any(&self) -> &dyn Any {
1350        self
1351    }
1352}
1353
1354impl MitoEngine {
1355    fn handle_alter_response(
1356        &self,
1357        region_id: RegionId,
1358        response: &mut RegionResponse,
1359    ) -> Result<()> {
1360        if let Some(statistic) = self.region_statistic(region_id) {
1361            Self::encode_manifest_info_to_extensions(
1362                &region_id,
1363                statistic.manifest,
1364                &mut response.extensions,
1365            )?;
1366        }
1367        let column_metadatas = self
1368            .inner
1369            .find_region(region_id)
1370            .ok()
1371            .map(|r| r.metadata().column_metadatas.clone());
1372        if let Some(column_metadatas) = column_metadatas {
1373            Self::encode_column_metadatas_to_extensions(
1374                &region_id,
1375                column_metadatas,
1376                &mut response.extensions,
1377            )?;
1378        }
1379        Ok(())
1380    }
1381
1382    fn handle_create_response(
1383        &self,
1384        region_id: RegionId,
1385        response: &mut RegionResponse,
1386    ) -> Result<()> {
1387        let column_metadatas = self
1388            .inner
1389            .find_region(region_id)
1390            .ok()
1391            .map(|r| r.metadata().column_metadatas.clone());
1392        if let Some(column_metadatas) = column_metadatas {
1393            Self::encode_column_metadatas_to_extensions(
1394                &region_id,
1395                column_metadatas,
1396                &mut response.extensions,
1397            )?;
1398        }
1399        Ok(())
1400    }
1401}
1402
1403// Tests methods.
1404#[cfg(any(test, feature = "test"))]
1405#[allow(clippy::too_many_arguments)]
1406impl MitoEngine {
1407    /// Returns a new [MitoEngine] for tests.
1408    pub async fn new_for_test<S: LogStore>(
1409        data_home: &str,
1410        mut config: MitoConfig,
1411        log_store: Arc<S>,
1412        object_store_manager: ObjectStoreManagerRef,
1413        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1414        listener: Option<crate::engine::listener::EventListenerRef>,
1415        time_provider: crate::time_provider::TimeProviderRef,
1416        schema_metadata_manager: SchemaMetadataManagerRef,
1417        file_ref_manager: FileReferenceManagerRef,
1418        partition_expr_fetcher: PartitionExprFetcherRef,
1419    ) -> Result<MitoEngine> {
1420        config.sanitize(data_home)?;
1421
1422        let config = Arc::new(config);
1423        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1424        let total_memory = get_total_memory_bytes().max(0) as u64;
1425        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1426        let scan_memory_tracker =
1427            QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
1428                .on_update(|usage| {
1429                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1430                })
1431                .on_exhausted(|| {
1432                    SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
1433                })
1434                .on_reject(|| {
1435                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
1436                })
1437                .build();
1438        Ok(MitoEngine {
1439            inner: Arc::new(EngineInner {
1440                workers: WorkerGroup::start_for_test(
1441                    config.clone(),
1442                    log_store,
1443                    object_store_manager,
1444                    write_buffer_manager,
1445                    listener,
1446                    schema_metadata_manager,
1447                    file_ref_manager,
1448                    time_provider,
1449                    partition_expr_fetcher,
1450                )
1451                .await?,
1452                config,
1453                wal_raw_entry_reader,
1454                scan_memory_tracker,
1455                #[cfg(feature = "enterprise")]
1456                extension_range_provider_factory: None,
1457            }),
1458        })
1459    }
1460
1461    /// Returns the purge scheduler.
1462    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1463        self.inner.workers.purge_scheduler()
1464    }
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469    use std::time::Duration;
1470
1471    use super::*;
1472    use crate::sst::file::FileMeta;
1473
1474    #[test]
1475    fn test_is_valid_region_edit() {
1476        // Valid: has only "files_to_add"
1477        let edit = RegionEdit {
1478            files_to_add: vec![FileMeta::default()],
1479            files_to_remove: vec![],
1480            timestamp_ms: None,
1481            compaction_time_window: None,
1482            flushed_entry_id: None,
1483            flushed_sequence: None,
1484            committed_sequence: None,
1485        };
1486        assert!(is_valid_region_edit(&edit));
1487
1488        // Invalid: "files_to_add" and "files_to_remove" are both empty
1489        let edit = RegionEdit {
1490            files_to_add: vec![],
1491            files_to_remove: vec![],
1492            timestamp_ms: None,
1493            compaction_time_window: None,
1494            flushed_entry_id: None,
1495            flushed_sequence: None,
1496            committed_sequence: None,
1497        };
1498        assert!(!is_valid_region_edit(&edit));
1499
1500        // Valid: has only "files_to_remove"
1501        let edit = RegionEdit {
1502            files_to_add: vec![],
1503            files_to_remove: vec![FileMeta::default()],
1504            timestamp_ms: None,
1505            compaction_time_window: None,
1506            flushed_entry_id: None,
1507            flushed_sequence: None,
1508            committed_sequence: None,
1509        };
1510        assert!(is_valid_region_edit(&edit));
1511
1512        // Valid: both "files_to_add" and "files_to_remove" are not empty
1513        let edit = RegionEdit {
1514            files_to_add: vec![FileMeta::default()],
1515            files_to_remove: vec![FileMeta::default()],
1516            timestamp_ms: None,
1517            compaction_time_window: None,
1518            flushed_entry_id: None,
1519            flushed_sequence: None,
1520            committed_sequence: None,
1521        };
1522        assert!(is_valid_region_edit(&edit));
1523
1524        // Invalid: other fields are not all "None"s
1525        let edit = RegionEdit {
1526            files_to_add: vec![FileMeta::default()],
1527            files_to_remove: vec![],
1528            timestamp_ms: None,
1529            compaction_time_window: Some(Duration::from_secs(1)),
1530            flushed_entry_id: None,
1531            flushed_sequence: None,
1532            committed_sequence: None,
1533        };
1534        assert!(!is_valid_region_edit(&edit));
1535        let edit = RegionEdit {
1536            files_to_add: vec![FileMeta::default()],
1537            files_to_remove: vec![],
1538            timestamp_ms: None,
1539            compaction_time_window: None,
1540            flushed_entry_id: Some(1),
1541            flushed_sequence: None,
1542            committed_sequence: None,
1543        };
1544        assert!(!is_valid_region_edit(&edit));
1545        let edit = RegionEdit {
1546            files_to_add: vec![FileMeta::default()],
1547            files_to_remove: vec![],
1548            timestamp_ms: None,
1549            compaction_time_window: None,
1550            flushed_entry_id: None,
1551            flushed_sequence: Some(1),
1552            committed_sequence: None,
1553        };
1554        assert!(!is_valid_region_edit(&edit));
1555    }
1556}