mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod skip_wal_test;
69#[cfg(test)]
70mod staging_test;
71#[cfg(test)]
72mod sync_test;
73#[cfg(test)]
74mod truncate_test;
75
76#[cfg(test)]
77mod copy_region_from_test;
78#[cfg(test)]
79mod remap_manifests_test;
80
81#[cfg(test)]
82mod apply_staging_manifest_test;
83#[cfg(test)]
84mod partition_filter_test;
85mod puffin_index;
86
87use std::any::Any;
88use std::collections::{HashMap, HashSet};
89use std::sync::Arc;
90use std::time::Instant;
91
92use api::region::RegionResponse;
93use async_trait::async_trait;
94use common_base::Plugins;
95use common_error::ext::BoxedError;
96use common_meta::error::UnexpectedSnafu;
97use common_meta::key::SchemaMetadataManagerRef;
98use common_recordbatch::{QueryMemoryTracker, SendableRecordBatchStream};
99use common_stat::get_total_memory_bytes;
100use common_telemetry::{info, tracing, warn};
101use common_wal::options::WalOptions;
102use futures::future::{join_all, try_join_all};
103use futures::stream::{self, Stream, StreamExt};
104use object_store::manager::ObjectStoreManagerRef;
105use snafu::{OptionExt, ResultExt, ensure};
106use store_api::ManifestVersion;
107use store_api::codec::PrimaryKeyEncoding;
108use store_api::logstore::LogStore;
109use store_api::logstore::provider::{KafkaProvider, Provider};
110use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
111use store_api::metric_engine_consts::{
112    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
113};
114use store_api::region_engine::{
115    BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
116    RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
117    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
118    SyncRegionFromRequest, SyncRegionFromResponse,
119};
120use store_api::region_request::{
121    AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
122};
123use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
124use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
125use tokio::sync::{Semaphore, oneshot};
126
127use crate::access_layer::RegionFilePathFactory;
128use crate::cache::{CacheManagerRef, CacheStrategy};
129use crate::config::MitoConfig;
130use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
131use crate::error::{
132    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
133    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
134};
135#[cfg(feature = "enterprise")]
136use crate::extension::BoxedExtensionRangeProviderFactory;
137use crate::gc::GcLimiterRef;
138use crate::manifest::action::RegionEdit;
139use crate::memtable::MemtableStats;
140use crate::metrics::{
141    HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_EXHAUSTED_TOTAL, SCAN_MEMORY_USAGE_BYTES,
142    SCAN_REQUESTS_REJECTED_TOTAL,
143};
144use crate::read::scan_region::{ScanRegion, Scanner};
145use crate::read::stream::ScanBatchStream;
146use crate::region::MitoRegionRef;
147use crate::region::opener::PartitionExprFetcherRef;
148use crate::region::options::parse_wal_options;
149use crate::request::{RegionEditRequest, WorkerRequest};
150use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
151use crate::sst::file_ref::FileReferenceManagerRef;
152use crate::sst::index::intermediate::IntermediateManager;
153use crate::sst::index::puffin_manager::PuffinManagerFactory;
154use crate::wal::entry_distributor::{
155    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
156};
157use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
158use crate::worker::WorkerGroup;
159
160pub const MITO_ENGINE_NAME: &str = "mito";
161
162pub struct MitoEngineBuilder<'a, S: LogStore> {
163    data_home: &'a str,
164    config: MitoConfig,
165    log_store: Arc<S>,
166    object_store_manager: ObjectStoreManagerRef,
167    schema_metadata_manager: SchemaMetadataManagerRef,
168    file_ref_manager: FileReferenceManagerRef,
169    partition_expr_fetcher: PartitionExprFetcherRef,
170    plugins: Plugins,
171    #[cfg(feature = "enterprise")]
172    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
173}
174
175impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
176    #[allow(clippy::too_many_arguments)]
177    pub fn new(
178        data_home: &'a str,
179        config: MitoConfig,
180        log_store: Arc<S>,
181        object_store_manager: ObjectStoreManagerRef,
182        schema_metadata_manager: SchemaMetadataManagerRef,
183        file_ref_manager: FileReferenceManagerRef,
184        partition_expr_fetcher: PartitionExprFetcherRef,
185        plugins: Plugins,
186    ) -> Self {
187        Self {
188            data_home,
189            config,
190            log_store,
191            object_store_manager,
192            schema_metadata_manager,
193            file_ref_manager,
194            plugins,
195            partition_expr_fetcher,
196            #[cfg(feature = "enterprise")]
197            extension_range_provider_factory: None,
198        }
199    }
200
201    #[cfg(feature = "enterprise")]
202    #[must_use]
203    pub fn with_extension_range_provider_factory(
204        self,
205        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
206    ) -> Self {
207        Self {
208            extension_range_provider_factory,
209            ..self
210        }
211    }
212
213    pub async fn try_build(mut self) -> Result<MitoEngine> {
214        self.config.sanitize(self.data_home)?;
215
216        let config = Arc::new(self.config);
217        let workers = WorkerGroup::start(
218            config.clone(),
219            self.log_store.clone(),
220            self.object_store_manager,
221            self.schema_metadata_manager,
222            self.file_ref_manager,
223            self.partition_expr_fetcher.clone(),
224            self.plugins,
225        )
226        .await?;
227        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
228        let total_memory = get_total_memory_bytes().max(0) as u64;
229        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
230        let scan_memory_tracker =
231            QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
232                .on_update(|usage| {
233                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
234                })
235                .on_exhausted(|| {
236                    SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
237                })
238                .on_reject(|| {
239                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
240                })
241                .build();
242
243        let inner = EngineInner {
244            workers,
245            config,
246            wal_raw_entry_reader,
247            scan_memory_tracker,
248            #[cfg(feature = "enterprise")]
249            extension_range_provider_factory: None,
250        };
251
252        #[cfg(feature = "enterprise")]
253        let inner =
254            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
255
256        Ok(MitoEngine {
257            inner: Arc::new(inner),
258        })
259    }
260}
261
262/// Region engine implementation for timeseries data.
263#[derive(Clone)]
264pub struct MitoEngine {
265    inner: Arc<EngineInner>,
266}
267
268impl MitoEngine {
269    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
270    #[allow(clippy::too_many_arguments)]
271    pub async fn new<S: LogStore>(
272        data_home: &str,
273        config: MitoConfig,
274        log_store: Arc<S>,
275        object_store_manager: ObjectStoreManagerRef,
276        schema_metadata_manager: SchemaMetadataManagerRef,
277        file_ref_manager: FileReferenceManagerRef,
278        partition_expr_fetcher: PartitionExprFetcherRef,
279        plugins: Plugins,
280    ) -> Result<MitoEngine> {
281        let builder = MitoEngineBuilder::new(
282            data_home,
283            config,
284            log_store,
285            object_store_manager,
286            schema_metadata_manager,
287            file_ref_manager,
288            partition_expr_fetcher,
289            plugins,
290        );
291        builder.try_build().await
292    }
293
294    pub fn mito_config(&self) -> &MitoConfig {
295        &self.inner.config
296    }
297
298    pub fn cache_manager(&self) -> CacheManagerRef {
299        self.inner.workers.cache_manager()
300    }
301
302    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
303        self.inner.workers.file_ref_manager()
304    }
305
306    pub fn gc_limiter(&self) -> GcLimiterRef {
307        self.inner.workers.gc_limiter()
308    }
309
310    pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
311        self.inner.workers.object_store_manager()
312    }
313
314    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
315        self.inner.workers.puffin_manager_factory()
316    }
317
318    pub fn intermediate_manager(&self) -> &IntermediateManager {
319        self.inner.workers.intermediate_manager()
320    }
321
322    pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
323        self.inner.workers.schema_metadata_manager()
324    }
325
326    /// Get all tmp ref files for given region ids, excluding files that's already in manifest.
327    pub async fn get_snapshot_of_file_refs(
328        &self,
329        file_handle_regions: impl IntoIterator<Item = RegionId>,
330        related_regions: HashMap<RegionId, HashSet<RegionId>>,
331    ) -> Result<FileRefsManifest> {
332        let file_ref_mgr = self.file_ref_manager();
333
334        let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
335        // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
336        // as regions on other datanodes are not managed by this engine.
337        let query_regions: Vec<MitoRegionRef> = file_handle_regions
338            .into_iter()
339            .filter_map(|region_id| self.find_region(region_id))
340            .collect();
341
342        let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
343            let dst2src = related_regions
344                .into_iter()
345                .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
346                .fold(
347                    HashMap::<RegionId, HashSet<RegionId>>::new(),
348                    |mut acc, (k, v)| {
349                        let entry = acc.entry(k).or_default();
350                        entry.insert(v);
351                        acc
352                    },
353                );
354            let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
355            for (dst_region, srcs) in dst2src {
356                let Some(dst_region) = self.find_region(dst_region) else {
357                    continue;
358                };
359                dst_region_to_src_regions.push((dst_region, srcs));
360            }
361            dst_region_to_src_regions
362        };
363
364        file_ref_mgr
365            .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
366            .await
367    }
368
369    /// Returns true if the specific region exists.
370    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
371        self.inner.workers.is_region_exists(region_id)
372    }
373
374    /// Returns true if the specific region exists.
375    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
376        self.inner.workers.is_region_opening(region_id)
377    }
378
379    /// Returns true if the specific region is catching up.
380    pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
381        self.inner.workers.is_region_catching_up(region_id)
382    }
383
384    /// Returns the region disk/memory statistic.
385    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
386        self.find_region(region_id)
387            .map(|region| region.region_statistic())
388    }
389
390    /// Returns primary key encoding of the region.
391    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
392        self.find_region(region_id)
393            .map(|r| r.primary_key_encoding())
394    }
395
396    /// Handle substrait query and return a stream of record batches
397    ///
398    /// Notice that the output stream's ordering is not guranateed. If order
399    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
400    #[tracing::instrument(skip_all)]
401    pub async fn scan_to_stream(
402        &self,
403        region_id: RegionId,
404        request: ScanRequest,
405    ) -> Result<SendableRecordBatchStream, BoxedError> {
406        self.scanner(region_id, request)
407            .await
408            .map_err(BoxedError::new)?
409            .scan()
410            .await
411    }
412
413    /// Scan [`Batch`]es by [`ScanRequest`].
414    pub async fn scan_batch(
415        &self,
416        region_id: RegionId,
417        request: ScanRequest,
418        filter_deleted: bool,
419    ) -> Result<ScanBatchStream> {
420        let mut scan_region = self.scan_region(region_id, request)?;
421        scan_region.set_filter_deleted(filter_deleted);
422        scan_region.scanner().await?.scan_batch()
423    }
424
425    /// Returns a scanner to scan for `request`.
426    pub(crate) async fn scanner(
427        &self,
428        region_id: RegionId,
429        request: ScanRequest,
430    ) -> Result<Scanner> {
431        self.scan_region(region_id, request)?.scanner().await
432    }
433
434    /// Scans a region.
435    #[tracing::instrument(skip_all, fields(region_id = %region_id))]
436    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
437        self.inner.scan_region(region_id, request)
438    }
439
440    /// Edit region's metadata by [RegionEdit] directly. Use with care.
441    /// Now we only allow adding files or removing files from region (the [RegionEdit] struct can only contain a non-empty "files_to_add" or "files_to_remove" field).
442    /// Other region editing intention will result in an "invalid request" error.
443    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
444    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
445        let _timer = HANDLE_REQUEST_ELAPSED
446            .with_label_values(&["edit_region"])
447            .start_timer();
448
449        ensure!(
450            is_valid_region_edit(&edit),
451            InvalidRequestSnafu {
452                region_id,
453                reason: "invalid region edit"
454            }
455        );
456
457        let (tx, rx) = oneshot::channel();
458        let request = WorkerRequest::EditRegion(RegionEditRequest {
459            region_id,
460            edit,
461            tx,
462        });
463        self.inner
464            .workers
465            .submit_to_worker(region_id, request)
466            .await?;
467        rx.await.context(RecvSnafu)?
468    }
469
470    /// Handles copy region from request.
471    ///
472    /// This method is only supported for internal use and is not exposed in the trait implementation.
473    pub async fn copy_region_from(
474        &self,
475        region_id: RegionId,
476        request: MitoCopyRegionFromRequest,
477    ) -> Result<MitoCopyRegionFromResponse> {
478        self.inner.copy_region_from(region_id, request).await
479    }
480
481    #[cfg(test)]
482    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
483        self.find_region(id)
484    }
485
486    pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
487        self.inner.workers.get_region(region_id)
488    }
489
490    /// Returns all regions.
491    pub fn regions(&self) -> Vec<MitoRegionRef> {
492        self.inner.workers.all_regions().collect()
493    }
494
495    fn encode_manifest_info_to_extensions(
496        region_id: &RegionId,
497        manifest_info: RegionManifestInfo,
498        extensions: &mut HashMap<String, Vec<u8>>,
499    ) -> Result<()> {
500        let region_manifest_info = vec![(*region_id, manifest_info)];
501
502        extensions.insert(
503            MANIFEST_INFO_EXTENSION_KEY.to_string(),
504            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
505        );
506        info!(
507            "Added manifest info: {:?} to extensions, region_id: {:?}",
508            region_manifest_info, region_id
509        );
510        Ok(())
511    }
512
513    fn encode_column_metadatas_to_extensions(
514        region_id: &RegionId,
515        column_metadatas: Vec<ColumnMetadata>,
516        extensions: &mut HashMap<String, Vec<u8>>,
517    ) -> Result<()> {
518        extensions.insert(
519            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
520            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
521        );
522        info!(
523            "Added column metadatas: {:?} to extensions, region_id: {:?}",
524            column_metadatas, region_id
525        );
526        Ok(())
527    }
528
529    /// Find the current version's memtables and SSTs stats by region_id.
530    /// The stats must be collected in one place one time to ensure data consistency.
531    pub fn find_memtable_and_sst_stats(
532        &self,
533        region_id: RegionId,
534    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
535        let region = self
536            .find_region(region_id)
537            .context(RegionNotFoundSnafu { region_id })?;
538
539        let version = region.version();
540        let memtable_stats = version
541            .memtables
542            .list_memtables()
543            .iter()
544            .map(|x| x.stats())
545            .collect::<Vec<_>>();
546
547        let sst_stats = version
548            .ssts
549            .levels()
550            .iter()
551            .flat_map(|level| level.files().map(|x| x.meta_ref()))
552            .cloned()
553            .collect::<Vec<_>>();
554        Ok((memtable_stats, sst_stats))
555    }
556
557    /// Lists all SSTs from the manifest of all regions in the engine.
558    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
559        let node_id = self.inner.workers.file_ref_manager().node_id();
560        let regions = self.inner.workers.all_regions();
561
562        let mut results = Vec::new();
563        for region in regions {
564            let mut entries = region.manifest_sst_entries().await;
565            for e in &mut entries {
566                e.node_id = node_id;
567            }
568            results.extend(entries);
569        }
570
571        results
572    }
573
574    /// Lists metadata about all puffin index targets stored in the engine.
575    pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
576        let node_id = self.inner.workers.file_ref_manager().node_id();
577        let cache_manager = self.inner.workers.cache_manager();
578        let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
579        let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
580        let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
581
582        let mut results = Vec::new();
583
584        for region in self.inner.workers.all_regions() {
585            let manifest_entries = region.manifest_sst_entries().await;
586            let access_layer = region.access_layer.clone();
587            let table_dir = access_layer.table_dir().to_string();
588            let path_type = access_layer.path_type();
589            let object_store = access_layer.object_store().clone();
590            let puffin_factory = access_layer.puffin_manager_factory().clone();
591            let path_factory = RegionFilePathFactory::new(table_dir, path_type);
592
593            let entry_futures = manifest_entries.into_iter().map(|entry| {
594                let object_store = object_store.clone();
595                let path_factory = path_factory.clone();
596                let puffin_factory = puffin_factory.clone();
597                let puffin_metadata_cache = puffin_metadata_cache.clone();
598                let bloom_filter_cache = bloom_filter_cache.clone();
599                let inverted_index_cache = inverted_index_cache.clone();
600
601                async move {
602                    let Some(index_file_path) = entry.index_file_path.as_ref() else {
603                        return Vec::new();
604                    };
605
606                    let index_version = entry.index_version;
607                    let file_id = match FileId::parse_str(&entry.file_id) {
608                        Ok(file_id) => file_id,
609                        Err(err) => {
610                            warn!(
611                                err;
612                                "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
613                                entry.table_dir,
614                                entry.file_id
615                            );
616                            return Vec::new();
617                        }
618                    };
619                    let region_index_id = RegionIndexId::new(
620                        RegionFileId::new(entry.region_id, file_id),
621                        index_version,
622                    );
623                    let context = IndexEntryContext {
624                        table_dir: &entry.table_dir,
625                        index_file_path: index_file_path.as_str(),
626                        region_id: entry.region_id,
627                        table_id: entry.table_id,
628                        region_number: entry.region_number,
629                        region_group: entry.region_group,
630                        region_sequence: entry.region_sequence,
631                        file_id: &entry.file_id,
632                        index_file_size: entry.index_file_size,
633                        node_id,
634                    };
635
636                    let manager = puffin_factory
637                        .build(object_store, path_factory)
638                        .with_puffin_metadata_cache(puffin_metadata_cache);
639
640                    collect_index_entries_from_puffin(
641                        manager,
642                        region_index_id,
643                        context,
644                        bloom_filter_cache,
645                        inverted_index_cache,
646                    )
647                    .await
648                }
649            });
650
651            let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
652            while let Some(mut metas) = meta_stream.next().await {
653                results.append(&mut metas);
654            }
655        }
656
657        results
658    }
659
660    /// Lists all SSTs from the storage layer of all regions in the engine.
661    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
662        let node_id = self.inner.workers.file_ref_manager().node_id();
663        let regions = self.inner.workers.all_regions();
664
665        let mut layers_distinct_table_dirs = HashMap::new();
666        for region in regions {
667            let table_dir = region.access_layer.table_dir();
668            if !layers_distinct_table_dirs.contains_key(table_dir) {
669                layers_distinct_table_dirs
670                    .insert(table_dir.to_string(), region.access_layer.clone());
671            }
672        }
673
674        stream::iter(layers_distinct_table_dirs)
675            .map(|(_, access_layer)| access_layer.storage_sst_entries())
676            .flatten()
677            .map(move |entry| {
678                entry.map(move |mut entry| {
679                    entry.node_id = node_id;
680                    entry
681                })
682            })
683    }
684}
685
686/// Check whether the region edit is valid.
687///
688/// Only adding or removing files to region is considered valid now.
689fn is_valid_region_edit(edit: &RegionEdit) -> bool {
690    (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
691        && matches!(
692            edit,
693            RegionEdit {
694                files_to_add: _,
695                files_to_remove: _,
696                timestamp_ms: _,
697                compaction_time_window: None,
698                flushed_entry_id: None,
699                flushed_sequence: None,
700                ..
701            }
702        )
703}
704
705/// Inner struct of [MitoEngine].
706struct EngineInner {
707    /// Region workers group.
708    workers: WorkerGroup,
709    /// Config of the engine.
710    config: Arc<MitoConfig>,
711    /// The Wal raw entry reader.
712    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
713    /// Memory tracker for table scans.
714    scan_memory_tracker: QueryMemoryTracker,
715    #[cfg(feature = "enterprise")]
716    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
717}
718
719type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
720
721/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
722fn prepare_batch_open_requests(
723    requests: Vec<(RegionId, RegionOpenRequest)>,
724) -> Result<(
725    TopicGroupedRegionOpenRequests,
726    Vec<(RegionId, RegionOpenRequest)>,
727)> {
728    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
729    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
730    for (region_id, request) in requests {
731        match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
732            WalOptions::Kafka(options) => {
733                topic_to_regions
734                    .entry(options.topic)
735                    .or_default()
736                    .push((region_id, request));
737            }
738            WalOptions::RaftEngine | WalOptions::Noop => {
739                remaining_regions.push((region_id, request));
740            }
741        }
742    }
743
744    Ok((topic_to_regions, remaining_regions))
745}
746
747impl EngineInner {
748    #[cfg(feature = "enterprise")]
749    #[must_use]
750    fn with_extension_range_provider_factory(
751        self,
752        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
753    ) -> Self {
754        Self {
755            extension_range_provider_factory,
756            ..self
757        }
758    }
759
760    /// Stop the inner engine.
761    async fn stop(&self) -> Result<()> {
762        self.workers.stop().await
763    }
764
765    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
766        self.workers
767            .get_region(region_id)
768            .context(RegionNotFoundSnafu { region_id })
769    }
770
771    /// Get metadata of a region.
772    ///
773    /// Returns error if the region doesn't exist.
774    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
775        // Reading a region doesn't need to go through the region worker thread.
776        let region = self.find_region(region_id)?;
777        Ok(region.metadata())
778    }
779
780    async fn open_topic_regions(
781        &self,
782        topic: String,
783        region_requests: Vec<(RegionId, RegionOpenRequest)>,
784    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
785        let now = Instant::now();
786        let region_ids = region_requests
787            .iter()
788            .map(|(region_id, _)| *region_id)
789            .collect::<Vec<_>>();
790        let provider = Provider::kafka_provider(topic);
791        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
792            provider.clone(),
793            self.wal_raw_entry_reader.clone(),
794            &region_ids,
795            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
796        );
797
798        let mut responses = Vec::with_capacity(region_requests.len());
799        for ((region_id, request), entry_receiver) in
800            region_requests.into_iter().zip(entry_receivers)
801        {
802            let (request, receiver) =
803                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
804            self.workers.submit_to_worker(region_id, request).await?;
805            responses.push(async move { receiver.await.context(RecvSnafu)? });
806        }
807
808        // Waits for entries distribution.
809        let distribution =
810            common_runtime::spawn_global(async move { distributor.distribute().await });
811        // Waits for worker returns.
812        let responses = join_all(responses).await;
813        distribution.await.context(JoinSnafu)??;
814
815        let num_failure = responses.iter().filter(|r| r.is_err()).count();
816        info!(
817            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
818            region_ids.len() - num_failure,
819            // Safety: provider is kafka provider.
820            provider.as_kafka_provider().unwrap(),
821            num_failure,
822            now.elapsed(),
823        );
824        Ok(region_ids.into_iter().zip(responses).collect())
825    }
826
827    async fn handle_batch_open_requests(
828        &self,
829        parallelism: usize,
830        requests: Vec<(RegionId, RegionOpenRequest)>,
831    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
832        let semaphore = Arc::new(Semaphore::new(parallelism));
833        let (topic_to_region_requests, remaining_region_requests) =
834            prepare_batch_open_requests(requests)?;
835        let mut responses =
836            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
837
838        if !topic_to_region_requests.is_empty() {
839            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
840            for (topic, region_requests) in topic_to_region_requests {
841                let semaphore_moved = semaphore.clone();
842                tasks.push(async move {
843                    // Safety: semaphore must exist
844                    let _permit = semaphore_moved.acquire().await.unwrap();
845                    self.open_topic_regions(topic, region_requests).await
846                })
847            }
848            let r = try_join_all(tasks).await?;
849            responses.extend(r.into_iter().flatten());
850        }
851
852        if !remaining_region_requests.is_empty() {
853            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
854            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
855            for (region_id, request) in remaining_region_requests {
856                let semaphore_moved = semaphore.clone();
857                region_ids.push(region_id);
858                tasks.push(async move {
859                    // Safety: semaphore must exist
860                    let _permit = semaphore_moved.acquire().await.unwrap();
861                    let (request, receiver) =
862                        WorkerRequest::new_open_region_request(region_id, request, None);
863
864                    self.workers.submit_to_worker(region_id, request).await?;
865
866                    receiver.await.context(RecvSnafu)?
867                })
868            }
869
870            let results = join_all(tasks).await;
871            responses.extend(region_ids.into_iter().zip(results));
872        }
873
874        Ok(responses)
875    }
876
877    async fn catchup_topic_regions(
878        &self,
879        provider: Provider,
880        region_requests: Vec<(RegionId, RegionCatchupRequest)>,
881    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
882        let now = Instant::now();
883        let region_ids = region_requests
884            .iter()
885            .map(|(region_id, _)| *region_id)
886            .collect::<Vec<_>>();
887        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
888            provider.clone(),
889            self.wal_raw_entry_reader.clone(),
890            &region_ids,
891            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
892        );
893
894        let mut responses = Vec::with_capacity(region_requests.len());
895        for ((region_id, request), entry_receiver) in
896            region_requests.into_iter().zip(entry_receivers)
897        {
898            let (request, receiver) =
899                WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
900            self.workers.submit_to_worker(region_id, request).await?;
901            responses.push(async move { receiver.await.context(RecvSnafu)? });
902        }
903
904        // Wait for entries distribution.
905        let distribution =
906            common_runtime::spawn_global(async move { distributor.distribute().await });
907        // Wait for worker returns.
908        let responses = join_all(responses).await;
909        distribution.await.context(JoinSnafu)??;
910
911        let num_failure = responses.iter().filter(|r| r.is_err()).count();
912        info!(
913            "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
914            region_ids.len() - num_failure,
915            // Safety: provider is kafka provider.
916            provider.as_kafka_provider().unwrap(),
917            num_failure,
918            now.elapsed(),
919        );
920
921        Ok(region_ids.into_iter().zip(responses).collect())
922    }
923
924    async fn handle_batch_catchup_requests(
925        &self,
926        parallelism: usize,
927        requests: Vec<(RegionId, RegionCatchupRequest)>,
928    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
929        let mut responses = Vec::with_capacity(requests.len());
930        let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
931        let mut remaining_region_requests = vec![];
932
933        for (region_id, request) in requests {
934            match self.workers.get_region(region_id) {
935                Some(region) => match region.provider.as_kafka_provider() {
936                    Some(provider) => {
937                        topic_regions
938                            .entry(provider.clone())
939                            .or_default()
940                            .push((region_id, request));
941                    }
942                    None => {
943                        remaining_region_requests.push((region_id, request));
944                    }
945                },
946                None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
947            }
948        }
949
950        let semaphore = Arc::new(Semaphore::new(parallelism));
951
952        if !topic_regions.is_empty() {
953            let mut tasks = Vec::with_capacity(topic_regions.len());
954            for (provider, region_requests) in topic_regions {
955                let semaphore_moved = semaphore.clone();
956                tasks.push(async move {
957                    // Safety: semaphore must exist
958                    let _permit = semaphore_moved.acquire().await.unwrap();
959                    self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
960                        .await
961                })
962            }
963
964            let r = try_join_all(tasks).await?;
965            responses.extend(r.into_iter().flatten());
966        }
967
968        if !remaining_region_requests.is_empty() {
969            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
970            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
971            for (region_id, request) in remaining_region_requests {
972                let semaphore_moved = semaphore.clone();
973                region_ids.push(region_id);
974                tasks.push(async move {
975                    // Safety: semaphore must exist
976                    let _permit = semaphore_moved.acquire().await.unwrap();
977                    let (request, receiver) =
978                        WorkerRequest::new_catchup_region_request(region_id, request, None);
979
980                    self.workers.submit_to_worker(region_id, request).await?;
981
982                    receiver.await.context(RecvSnafu)?
983                })
984            }
985
986            let results = join_all(tasks).await;
987            responses.extend(region_ids.into_iter().zip(results));
988        }
989
990        Ok(responses)
991    }
992
993    /// Handles [RegionRequest] and return its executed result.
994    async fn handle_request(
995        &self,
996        region_id: RegionId,
997        request: RegionRequest,
998    ) -> Result<AffectedRows> {
999        let region_metadata = self.get_metadata(region_id).ok();
1000        let (request, receiver) =
1001            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
1002        self.workers.submit_to_worker(region_id, request).await?;
1003
1004        receiver.await.context(RecvSnafu)?
1005    }
1006
1007    /// Returns the sequence of latest committed data.
1008    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
1009        // Reading a region doesn't need to go through the region worker thread.
1010        self.find_region(region_id)
1011            .map(|r| r.find_committed_sequence())
1012    }
1013
1014    /// Handles the scan `request` and returns a [ScanRegion].
1015    #[tracing::instrument(skip_all, fields(region_id = %region_id))]
1016    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
1017        let query_start = Instant::now();
1018        // Reading a region doesn't need to go through the region worker thread.
1019        let region = self.find_region(region_id)?;
1020        let version = region.version();
1021        // Get cache.
1022        let cache_manager = self.workers.cache_manager();
1023
1024        let scan_region = ScanRegion::new(
1025            version,
1026            region.access_layer.clone(),
1027            request,
1028            CacheStrategy::EnableAll(cache_manager),
1029        )
1030        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
1031        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
1032        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
1033        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
1034        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
1035        .with_start_time(query_start);
1036
1037        #[cfg(feature = "enterprise")]
1038        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
1039
1040        Ok(scan_region)
1041    }
1042
1043    #[cfg(feature = "enterprise")]
1044    fn maybe_fill_extension_range_provider(
1045        &self,
1046        mut scan_region: ScanRegion,
1047        region: MitoRegionRef,
1048    ) -> ScanRegion {
1049        if region.is_follower()
1050            && let Some(factory) = self.extension_range_provider_factory.as_ref()
1051        {
1052            scan_region
1053                .set_extension_range_provider(factory.create_extension_range_provider(region));
1054        }
1055        scan_region
1056    }
1057
1058    /// Converts the [`RegionRole`].
1059    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1060        let region = self.find_region(region_id)?;
1061        region.set_role(role);
1062        Ok(())
1063    }
1064
1065    /// Sets read-only for a region and ensures no more writes in the region after it returns.
1066    async fn set_region_role_state_gracefully(
1067        &self,
1068        region_id: RegionId,
1069        region_role_state: SettableRegionRoleState,
1070    ) -> Result<SetRegionRoleStateResponse> {
1071        // Notes: It acquires the mutable ownership to ensure no other threads,
1072        // Therefore, we submit it to the worker.
1073        let (request, receiver) =
1074            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1075        self.workers.submit_to_worker(region_id, request).await?;
1076
1077        receiver.await.context(RecvSnafu)
1078    }
1079
1080    async fn sync_region(
1081        &self,
1082        region_id: RegionId,
1083        manifest_info: RegionManifestInfo,
1084    ) -> Result<(ManifestVersion, bool)> {
1085        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1086        let manifest_version = manifest_info.data_manifest_version();
1087        let (request, receiver) =
1088            WorkerRequest::new_sync_region_request(region_id, manifest_version);
1089        self.workers.submit_to_worker(region_id, request).await?;
1090
1091        receiver.await.context(RecvSnafu)?
1092    }
1093
1094    async fn remap_manifests(
1095        &self,
1096        request: RemapManifestsRequest,
1097    ) -> Result<RemapManifestsResponse> {
1098        let region_id = request.region_id;
1099        let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1100        self.workers.submit_to_worker(region_id, request).await?;
1101        let manifest_paths = receiver.await.context(RecvSnafu)??;
1102        Ok(RemapManifestsResponse { manifest_paths })
1103    }
1104
1105    async fn copy_region_from(
1106        &self,
1107        region_id: RegionId,
1108        request: MitoCopyRegionFromRequest,
1109    ) -> Result<MitoCopyRegionFromResponse> {
1110        let (request, receiver) =
1111            WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
1112        self.workers.submit_to_worker(region_id, request).await?;
1113        let response = receiver.await.context(RecvSnafu)??;
1114        Ok(response)
1115    }
1116
1117    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1118        self.workers.get_region(region_id).map(|region| {
1119            if region.is_follower() {
1120                RegionRole::Follower
1121            } else {
1122                RegionRole::Leader
1123            }
1124        })
1125    }
1126}
1127
1128fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
1129    responses
1130        .into_iter()
1131        .map(|(region_id, response)| {
1132            (
1133                region_id,
1134                response.map(RegionResponse::new).map_err(BoxedError::new),
1135            )
1136        })
1137        .collect()
1138}
1139
1140#[async_trait]
1141impl RegionEngine for MitoEngine {
1142    fn name(&self) -> &str {
1143        MITO_ENGINE_NAME
1144    }
1145
1146    #[tracing::instrument(skip_all)]
1147    async fn handle_batch_open_requests(
1148        &self,
1149        parallelism: usize,
1150        requests: Vec<(RegionId, RegionOpenRequest)>,
1151    ) -> Result<BatchResponses, BoxedError> {
1152        // TODO(weny): add metrics.
1153        self.inner
1154            .handle_batch_open_requests(parallelism, requests)
1155            .await
1156            .map(map_batch_responses)
1157            .map_err(BoxedError::new)
1158    }
1159
1160    #[tracing::instrument(skip_all)]
1161    async fn handle_batch_catchup_requests(
1162        &self,
1163        parallelism: usize,
1164        requests: Vec<(RegionId, RegionCatchupRequest)>,
1165    ) -> Result<BatchResponses, BoxedError> {
1166        self.inner
1167            .handle_batch_catchup_requests(parallelism, requests)
1168            .await
1169            .map(map_batch_responses)
1170            .map_err(BoxedError::new)
1171    }
1172
1173    #[tracing::instrument(skip_all)]
1174    async fn handle_request(
1175        &self,
1176        region_id: RegionId,
1177        request: RegionRequest,
1178    ) -> Result<RegionResponse, BoxedError> {
1179        let _timer = HANDLE_REQUEST_ELAPSED
1180            .with_label_values(&[request.request_type()])
1181            .start_timer();
1182
1183        let is_alter = matches!(request, RegionRequest::Alter(_));
1184        let is_create = matches!(request, RegionRequest::Create(_));
1185        let mut response = self
1186            .inner
1187            .handle_request(region_id, request)
1188            .await
1189            .map(RegionResponse::new)
1190            .map_err(BoxedError::new)?;
1191
1192        if is_alter {
1193            self.handle_alter_response(region_id, &mut response)
1194                .map_err(BoxedError::new)?;
1195        } else if is_create {
1196            self.handle_create_response(region_id, &mut response)
1197                .map_err(BoxedError::new)?;
1198        }
1199
1200        Ok(response)
1201    }
1202
1203    #[tracing::instrument(skip_all)]
1204    async fn handle_query(
1205        &self,
1206        region_id: RegionId,
1207        request: ScanRequest,
1208    ) -> Result<RegionScannerRef, BoxedError> {
1209        self.scan_region(region_id, request)
1210            .map_err(BoxedError::new)?
1211            .region_scanner()
1212            .await
1213            .map_err(BoxedError::new)
1214    }
1215
1216    fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
1217        Some(self.inner.scan_memory_tracker.clone())
1218    }
1219
1220    async fn get_committed_sequence(
1221        &self,
1222        region_id: RegionId,
1223    ) -> Result<SequenceNumber, BoxedError> {
1224        self.inner
1225            .get_committed_sequence(region_id)
1226            .map_err(BoxedError::new)
1227    }
1228
1229    /// Retrieve region's metadata.
1230    async fn get_metadata(
1231        &self,
1232        region_id: RegionId,
1233    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1234        self.inner.get_metadata(region_id).map_err(BoxedError::new)
1235    }
1236
1237    /// Stop the engine.
1238    ///
1239    /// Stopping the engine doesn't stop the underlying log store as other components might
1240    /// still use it. (When no other components are referencing the log store, it will
1241    /// automatically shutdown.)
1242    async fn stop(&self) -> std::result::Result<(), BoxedError> {
1243        self.inner.stop().await.map_err(BoxedError::new)
1244    }
1245
1246    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1247        self.get_region_statistic(region_id)
1248    }
1249
1250    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1251        self.inner
1252            .set_region_role(region_id, role)
1253            .map_err(BoxedError::new)
1254    }
1255
1256    async fn set_region_role_state_gracefully(
1257        &self,
1258        region_id: RegionId,
1259        region_role_state: SettableRegionRoleState,
1260    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1261        let _timer = HANDLE_REQUEST_ELAPSED
1262            .with_label_values(&["set_region_role_state_gracefully"])
1263            .start_timer();
1264
1265        self.inner
1266            .set_region_role_state_gracefully(region_id, region_role_state)
1267            .await
1268            .map_err(BoxedError::new)
1269    }
1270
1271    async fn sync_region(
1272        &self,
1273        region_id: RegionId,
1274        request: SyncRegionFromRequest,
1275    ) -> Result<SyncRegionFromResponse, BoxedError> {
1276        let manifest_info = request
1277            .into_region_manifest_info()
1278            .context(UnexpectedSnafu {
1279                err_msg: "Expected a manifest info request",
1280            })
1281            .map_err(BoxedError::new)?;
1282        let (_, synced) = self
1283            .inner
1284            .sync_region(region_id, manifest_info)
1285            .await
1286            .map_err(BoxedError::new)?;
1287
1288        Ok(SyncRegionFromResponse::Mito { synced })
1289    }
1290
1291    async fn remap_manifests(
1292        &self,
1293        request: RemapManifestsRequest,
1294    ) -> Result<RemapManifestsResponse, BoxedError> {
1295        self.inner
1296            .remap_manifests(request)
1297            .await
1298            .map_err(BoxedError::new)
1299    }
1300
1301    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1302        self.inner.role(region_id)
1303    }
1304
1305    fn as_any(&self) -> &dyn Any {
1306        self
1307    }
1308}
1309
1310impl MitoEngine {
1311    fn handle_alter_response(
1312        &self,
1313        region_id: RegionId,
1314        response: &mut RegionResponse,
1315    ) -> Result<()> {
1316        if let Some(statistic) = self.region_statistic(region_id) {
1317            Self::encode_manifest_info_to_extensions(
1318                &region_id,
1319                statistic.manifest,
1320                &mut response.extensions,
1321            )?;
1322        }
1323        let column_metadatas = self
1324            .inner
1325            .find_region(region_id)
1326            .ok()
1327            .map(|r| r.metadata().column_metadatas.clone());
1328        if let Some(column_metadatas) = column_metadatas {
1329            Self::encode_column_metadatas_to_extensions(
1330                &region_id,
1331                column_metadatas,
1332                &mut response.extensions,
1333            )?;
1334        }
1335        Ok(())
1336    }
1337
1338    fn handle_create_response(
1339        &self,
1340        region_id: RegionId,
1341        response: &mut RegionResponse,
1342    ) -> Result<()> {
1343        let column_metadatas = self
1344            .inner
1345            .find_region(region_id)
1346            .ok()
1347            .map(|r| r.metadata().column_metadatas.clone());
1348        if let Some(column_metadatas) = column_metadatas {
1349            Self::encode_column_metadatas_to_extensions(
1350                &region_id,
1351                column_metadatas,
1352                &mut response.extensions,
1353            )?;
1354        }
1355        Ok(())
1356    }
1357}
1358
1359// Tests methods.
1360#[cfg(any(test, feature = "test"))]
1361#[allow(clippy::too_many_arguments)]
1362impl MitoEngine {
1363    /// Returns a new [MitoEngine] for tests.
1364    pub async fn new_for_test<S: LogStore>(
1365        data_home: &str,
1366        mut config: MitoConfig,
1367        log_store: Arc<S>,
1368        object_store_manager: ObjectStoreManagerRef,
1369        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1370        listener: Option<crate::engine::listener::EventListenerRef>,
1371        time_provider: crate::time_provider::TimeProviderRef,
1372        schema_metadata_manager: SchemaMetadataManagerRef,
1373        file_ref_manager: FileReferenceManagerRef,
1374        partition_expr_fetcher: PartitionExprFetcherRef,
1375    ) -> Result<MitoEngine> {
1376        config.sanitize(data_home)?;
1377
1378        let config = Arc::new(config);
1379        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1380        let total_memory = get_total_memory_bytes().max(0) as u64;
1381        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1382        let scan_memory_tracker =
1383            QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
1384                .on_update(|usage| {
1385                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1386                })
1387                .on_exhausted(|| {
1388                    SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
1389                })
1390                .on_reject(|| {
1391                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
1392                })
1393                .build();
1394        Ok(MitoEngine {
1395            inner: Arc::new(EngineInner {
1396                workers: WorkerGroup::start_for_test(
1397                    config.clone(),
1398                    log_store,
1399                    object_store_manager,
1400                    write_buffer_manager,
1401                    listener,
1402                    schema_metadata_manager,
1403                    file_ref_manager,
1404                    time_provider,
1405                    partition_expr_fetcher,
1406                )
1407                .await?,
1408                config,
1409                wal_raw_entry_reader,
1410                scan_memory_tracker,
1411                #[cfg(feature = "enterprise")]
1412                extension_range_provider_factory: None,
1413            }),
1414        })
1415    }
1416
1417    /// Returns the purge scheduler.
1418    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1419        self.inner.workers.purge_scheduler()
1420    }
1421}
1422
1423#[cfg(test)]
1424mod tests {
1425    use std::time::Duration;
1426
1427    use super::*;
1428    use crate::sst::file::FileMeta;
1429
1430    #[test]
1431    fn test_is_valid_region_edit() {
1432        // Valid: has only "files_to_add"
1433        let edit = RegionEdit {
1434            files_to_add: vec![FileMeta::default()],
1435            files_to_remove: vec![],
1436            timestamp_ms: None,
1437            compaction_time_window: None,
1438            flushed_entry_id: None,
1439            flushed_sequence: None,
1440            committed_sequence: None,
1441        };
1442        assert!(is_valid_region_edit(&edit));
1443
1444        // Invalid: "files_to_add" and "files_to_remove" are both empty
1445        let edit = RegionEdit {
1446            files_to_add: vec![],
1447            files_to_remove: vec![],
1448            timestamp_ms: None,
1449            compaction_time_window: None,
1450            flushed_entry_id: None,
1451            flushed_sequence: None,
1452            committed_sequence: None,
1453        };
1454        assert!(!is_valid_region_edit(&edit));
1455
1456        // Valid: "files_to_remove" is not empty
1457        let edit = RegionEdit {
1458            files_to_add: vec![FileMeta::default()],
1459            files_to_remove: vec![FileMeta::default()],
1460            timestamp_ms: None,
1461            compaction_time_window: None,
1462            flushed_entry_id: None,
1463            flushed_sequence: None,
1464            committed_sequence: None,
1465        };
1466        assert!(is_valid_region_edit(&edit));
1467
1468        // Invalid: other fields are not all "None"s
1469        let edit = RegionEdit {
1470            files_to_add: vec![FileMeta::default()],
1471            files_to_remove: vec![],
1472            timestamp_ms: None,
1473            compaction_time_window: Some(Duration::from_secs(1)),
1474            flushed_entry_id: None,
1475            flushed_sequence: None,
1476            committed_sequence: None,
1477        };
1478        assert!(!is_valid_region_edit(&edit));
1479        let edit = RegionEdit {
1480            files_to_add: vec![FileMeta::default()],
1481            files_to_remove: vec![],
1482            timestamp_ms: None,
1483            compaction_time_window: None,
1484            flushed_entry_id: Some(1),
1485            flushed_sequence: None,
1486            committed_sequence: None,
1487        };
1488        assert!(!is_valid_region_edit(&edit));
1489        let edit = RegionEdit {
1490            files_to_add: vec![FileMeta::default()],
1491            files_to_remove: vec![],
1492            timestamp_ms: None,
1493            compaction_time_window: None,
1494            flushed_entry_id: None,
1495            flushed_sequence: Some(1),
1496            committed_sequence: None,
1497        };
1498        assert!(!is_valid_region_edit(&edit));
1499    }
1500}