mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod skip_wal_test;
69#[cfg(test)]
70mod staging_test;
71#[cfg(test)]
72mod sync_test;
73#[cfg(test)]
74mod truncate_test;
75
76#[cfg(test)]
77mod copy_region_from_test;
78#[cfg(test)]
79mod remap_manifests_test;
80
81#[cfg(test)]
82mod apply_staging_manifest_test;
83#[cfg(test)]
84mod partition_filter_test;
85mod puffin_index;
86
87use std::any::Any;
88use std::collections::{HashMap, HashSet};
89use std::sync::Arc;
90use std::time::Instant;
91
92use api::region::RegionResponse;
93use async_trait::async_trait;
94use common_base::Plugins;
95use common_error::ext::BoxedError;
96use common_meta::error::UnexpectedSnafu;
97use common_meta::key::SchemaMetadataManagerRef;
98use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
99use common_stat::get_total_memory_bytes;
100use common_telemetry::{info, tracing, warn};
101use common_wal::options::WalOptions;
102use futures::future::{join_all, try_join_all};
103use futures::stream::{self, Stream, StreamExt};
104use object_store::manager::ObjectStoreManagerRef;
105use snafu::{OptionExt, ResultExt, ensure};
106use store_api::ManifestVersion;
107use store_api::codec::PrimaryKeyEncoding;
108use store_api::logstore::LogStore;
109use store_api::logstore::provider::{KafkaProvider, Provider};
110use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
111use store_api::metric_engine_consts::{
112    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
113};
114use store_api::region_engine::{
115    BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
116    RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
117    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
118    SyncRegionFromRequest, SyncRegionFromResponse,
119};
120use store_api::region_request::{
121    AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
122};
123use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
124use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
125use tokio::sync::{Semaphore, oneshot};
126
127use crate::access_layer::RegionFilePathFactory;
128use crate::cache::{CacheManagerRef, CacheStrategy};
129use crate::config::MitoConfig;
130use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
131use crate::error::{
132    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
133    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
134};
135#[cfg(feature = "enterprise")]
136use crate::extension::BoxedExtensionRangeProviderFactory;
137use crate::gc::GcLimiterRef;
138use crate::manifest::action::RegionEdit;
139use crate::memtable::MemtableStats;
140use crate::metrics::{
141    HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
142};
143use crate::read::scan_region::{ScanRegion, Scanner};
144use crate::read::stream::ScanBatchStream;
145use crate::region::MitoRegionRef;
146use crate::region::opener::PartitionExprFetcherRef;
147use crate::region::options::parse_wal_options;
148use crate::request::{RegionEditRequest, WorkerRequest};
149use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
150use crate::sst::file_ref::FileReferenceManagerRef;
151use crate::sst::index::intermediate::IntermediateManager;
152use crate::sst::index::puffin_manager::PuffinManagerFactory;
153use crate::wal::entry_distributor::{
154    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
155};
156use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
157use crate::worker::WorkerGroup;
158
159pub const MITO_ENGINE_NAME: &str = "mito";
160
161pub struct MitoEngineBuilder<'a, S: LogStore> {
162    data_home: &'a str,
163    config: MitoConfig,
164    log_store: Arc<S>,
165    object_store_manager: ObjectStoreManagerRef,
166    schema_metadata_manager: SchemaMetadataManagerRef,
167    file_ref_manager: FileReferenceManagerRef,
168    partition_expr_fetcher: PartitionExprFetcherRef,
169    plugins: Plugins,
170    max_concurrent_queries: usize,
171    #[cfg(feature = "enterprise")]
172    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
173}
174
175impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
176    #[allow(clippy::too_many_arguments)]
177    pub fn new(
178        data_home: &'a str,
179        config: MitoConfig,
180        log_store: Arc<S>,
181        object_store_manager: ObjectStoreManagerRef,
182        schema_metadata_manager: SchemaMetadataManagerRef,
183        file_ref_manager: FileReferenceManagerRef,
184        partition_expr_fetcher: PartitionExprFetcherRef,
185        plugins: Plugins,
186        max_concurrent_queries: usize,
187    ) -> Self {
188        Self {
189            data_home,
190            config,
191            log_store,
192            object_store_manager,
193            schema_metadata_manager,
194            file_ref_manager,
195            plugins,
196            partition_expr_fetcher,
197            max_concurrent_queries,
198            #[cfg(feature = "enterprise")]
199            extension_range_provider_factory: None,
200        }
201    }
202
203    #[cfg(feature = "enterprise")]
204    #[must_use]
205    pub fn with_extension_range_provider_factory(
206        self,
207        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
208    ) -> Self {
209        Self {
210            extension_range_provider_factory,
211            ..self
212        }
213    }
214
215    pub async fn try_build(mut self) -> Result<MitoEngine> {
216        self.config.sanitize(self.data_home)?;
217
218        let config = Arc::new(self.config);
219        let workers = WorkerGroup::start(
220            config.clone(),
221            self.log_store.clone(),
222            self.object_store_manager,
223            self.schema_metadata_manager,
224            self.file_ref_manager,
225            self.partition_expr_fetcher.clone(),
226            self.plugins,
227        )
228        .await?;
229        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
230        let total_memory = get_total_memory_bytes().max(0) as u64;
231        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
232        let scan_memory_tracker =
233            QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
234                .with_on_update(|usage| {
235                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
236                })
237                .with_on_reject(|| {
238                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
239                });
240
241        let inner = EngineInner {
242            workers,
243            config,
244            wal_raw_entry_reader,
245            scan_memory_tracker,
246            #[cfg(feature = "enterprise")]
247            extension_range_provider_factory: None,
248        };
249
250        #[cfg(feature = "enterprise")]
251        let inner =
252            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
253
254        Ok(MitoEngine {
255            inner: Arc::new(inner),
256        })
257    }
258}
259
260/// Region engine implementation for timeseries data.
261#[derive(Clone)]
262pub struct MitoEngine {
263    inner: Arc<EngineInner>,
264}
265
266impl MitoEngine {
267    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
268    #[allow(clippy::too_many_arguments)]
269    pub async fn new<S: LogStore>(
270        data_home: &str,
271        config: MitoConfig,
272        log_store: Arc<S>,
273        object_store_manager: ObjectStoreManagerRef,
274        schema_metadata_manager: SchemaMetadataManagerRef,
275        file_ref_manager: FileReferenceManagerRef,
276        partition_expr_fetcher: PartitionExprFetcherRef,
277        plugins: Plugins,
278    ) -> Result<MitoEngine> {
279        let builder = MitoEngineBuilder::new(
280            data_home,
281            config,
282            log_store,
283            object_store_manager,
284            schema_metadata_manager,
285            file_ref_manager,
286            partition_expr_fetcher,
287            plugins,
288            0, // Default: no limit on concurrent queries
289        );
290        builder.try_build().await
291    }
292
293    pub fn mito_config(&self) -> &MitoConfig {
294        &self.inner.config
295    }
296
297    pub fn cache_manager(&self) -> CacheManagerRef {
298        self.inner.workers.cache_manager()
299    }
300
301    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
302        self.inner.workers.file_ref_manager()
303    }
304
305    pub fn gc_limiter(&self) -> GcLimiterRef {
306        self.inner.workers.gc_limiter()
307    }
308
309    pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
310        self.inner.workers.object_store_manager()
311    }
312
313    pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
314        self.inner.workers.puffin_manager_factory()
315    }
316
317    pub fn intermediate_manager(&self) -> &IntermediateManager {
318        self.inner.workers.intermediate_manager()
319    }
320
321    pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
322        self.inner.workers.schema_metadata_manager()
323    }
324
325    /// Get all tmp ref files for given region ids, excluding files that's already in manifest.
326    pub async fn get_snapshot_of_file_refs(
327        &self,
328        file_handle_regions: impl IntoIterator<Item = RegionId>,
329        related_regions: HashMap<RegionId, HashSet<RegionId>>,
330    ) -> Result<FileRefsManifest> {
331        let file_ref_mgr = self.file_ref_manager();
332
333        let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
334        // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
335        // as regions on other datanodes are not managed by this engine.
336        let query_regions: Vec<MitoRegionRef> = file_handle_regions
337            .into_iter()
338            .filter_map(|region_id| self.find_region(region_id))
339            .collect();
340
341        let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
342            let dst2src = related_regions
343                .into_iter()
344                .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
345                .fold(
346                    HashMap::<RegionId, HashSet<RegionId>>::new(),
347                    |mut acc, (k, v)| {
348                        let entry = acc.entry(k).or_default();
349                        entry.insert(v);
350                        acc
351                    },
352                );
353            let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
354            for (dst_region, srcs) in dst2src {
355                let Some(dst_region) = self.find_region(dst_region) else {
356                    continue;
357                };
358                dst_region_to_src_regions.push((dst_region, srcs));
359            }
360            dst_region_to_src_regions
361        };
362
363        file_ref_mgr
364            .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
365            .await
366    }
367
368    /// Returns true if the specific region exists.
369    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
370        self.inner.workers.is_region_exists(region_id)
371    }
372
373    /// Returns true if the specific region exists.
374    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
375        self.inner.workers.is_region_opening(region_id)
376    }
377
378    /// Returns true if the specific region is catching up.
379    pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
380        self.inner.workers.is_region_catching_up(region_id)
381    }
382
383    /// Returns the region disk/memory statistic.
384    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
385        self.find_region(region_id)
386            .map(|region| region.region_statistic())
387    }
388
389    /// Returns primary key encoding of the region.
390    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
391        self.find_region(region_id)
392            .map(|r| r.primary_key_encoding())
393    }
394
395    /// Handle substrait query and return a stream of record batches
396    ///
397    /// Notice that the output stream's ordering is not guranateed. If order
398    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
399    #[tracing::instrument(skip_all)]
400    pub async fn scan_to_stream(
401        &self,
402        region_id: RegionId,
403        request: ScanRequest,
404    ) -> Result<SendableRecordBatchStream, BoxedError> {
405        self.scanner(region_id, request)
406            .await
407            .map_err(BoxedError::new)?
408            .scan()
409            .await
410    }
411
412    /// Scan [`Batch`]es by [`ScanRequest`].
413    pub async fn scan_batch(
414        &self,
415        region_id: RegionId,
416        request: ScanRequest,
417        filter_deleted: bool,
418    ) -> Result<ScanBatchStream> {
419        let mut scan_region = self.scan_region(region_id, request)?;
420        scan_region.set_filter_deleted(filter_deleted);
421        scan_region.scanner().await?.scan_batch()
422    }
423
424    /// Returns a scanner to scan for `request`.
425    pub(crate) async fn scanner(
426        &self,
427        region_id: RegionId,
428        request: ScanRequest,
429    ) -> Result<Scanner> {
430        self.scan_region(region_id, request)?.scanner().await
431    }
432
433    /// Scans a region.
434    #[tracing::instrument(skip_all, fields(region_id = %region_id))]
435    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
436        self.inner.scan_region(region_id, request)
437    }
438
439    /// Edit region's metadata by [RegionEdit] directly. Use with care.
440    /// Now we only allow adding files or removing files from region (the [RegionEdit] struct can only contain a non-empty "files_to_add" or "files_to_remove" field).
441    /// Other region editing intention will result in an "invalid request" error.
442    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
443    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
444        let _timer = HANDLE_REQUEST_ELAPSED
445            .with_label_values(&["edit_region"])
446            .start_timer();
447
448        ensure!(
449            is_valid_region_edit(&edit),
450            InvalidRequestSnafu {
451                region_id,
452                reason: "invalid region edit"
453            }
454        );
455
456        let (tx, rx) = oneshot::channel();
457        let request = WorkerRequest::EditRegion(RegionEditRequest {
458            region_id,
459            edit,
460            tx,
461        });
462        self.inner
463            .workers
464            .submit_to_worker(region_id, request)
465            .await?;
466        rx.await.context(RecvSnafu)?
467    }
468
469    /// Handles copy region from request.
470    ///
471    /// This method is only supported for internal use and is not exposed in the trait implementation.
472    pub async fn copy_region_from(
473        &self,
474        region_id: RegionId,
475        request: MitoCopyRegionFromRequest,
476    ) -> Result<MitoCopyRegionFromResponse> {
477        self.inner.copy_region_from(region_id, request).await
478    }
479
480    #[cfg(test)]
481    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
482        self.find_region(id)
483    }
484
485    pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
486        self.inner.workers.get_region(region_id)
487    }
488
489    /// Returns all regions.
490    pub fn regions(&self) -> Vec<MitoRegionRef> {
491        self.inner.workers.all_regions().collect()
492    }
493
494    fn encode_manifest_info_to_extensions(
495        region_id: &RegionId,
496        manifest_info: RegionManifestInfo,
497        extensions: &mut HashMap<String, Vec<u8>>,
498    ) -> Result<()> {
499        let region_manifest_info = vec![(*region_id, manifest_info)];
500
501        extensions.insert(
502            MANIFEST_INFO_EXTENSION_KEY.to_string(),
503            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
504        );
505        info!(
506            "Added manifest info: {:?} to extensions, region_id: {:?}",
507            region_manifest_info, region_id
508        );
509        Ok(())
510    }
511
512    fn encode_column_metadatas_to_extensions(
513        region_id: &RegionId,
514        column_metadatas: Vec<ColumnMetadata>,
515        extensions: &mut HashMap<String, Vec<u8>>,
516    ) -> Result<()> {
517        extensions.insert(
518            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
519            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
520        );
521        info!(
522            "Added column metadatas: {:?} to extensions, region_id: {:?}",
523            column_metadatas, region_id
524        );
525        Ok(())
526    }
527
528    /// Find the current version's memtables and SSTs stats by region_id.
529    /// The stats must be collected in one place one time to ensure data consistency.
530    pub fn find_memtable_and_sst_stats(
531        &self,
532        region_id: RegionId,
533    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
534        let region = self
535            .find_region(region_id)
536            .context(RegionNotFoundSnafu { region_id })?;
537
538        let version = region.version();
539        let memtable_stats = version
540            .memtables
541            .list_memtables()
542            .iter()
543            .map(|x| x.stats())
544            .collect::<Vec<_>>();
545
546        let sst_stats = version
547            .ssts
548            .levels()
549            .iter()
550            .flat_map(|level| level.files().map(|x| x.meta_ref()))
551            .cloned()
552            .collect::<Vec<_>>();
553        Ok((memtable_stats, sst_stats))
554    }
555
556    /// Lists all SSTs from the manifest of all regions in the engine.
557    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
558        let node_id = self.inner.workers.file_ref_manager().node_id();
559        let regions = self.inner.workers.all_regions();
560
561        let mut results = Vec::new();
562        for region in regions {
563            let mut entries = region.manifest_sst_entries().await;
564            for e in &mut entries {
565                e.node_id = node_id;
566            }
567            results.extend(entries);
568        }
569
570        results
571    }
572
573    /// Lists metadata about all puffin index targets stored in the engine.
574    pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
575        let node_id = self.inner.workers.file_ref_manager().node_id();
576        let cache_manager = self.inner.workers.cache_manager();
577        let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
578        let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
579        let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
580
581        let mut results = Vec::new();
582
583        for region in self.inner.workers.all_regions() {
584            let manifest_entries = region.manifest_sst_entries().await;
585            let access_layer = region.access_layer.clone();
586            let table_dir = access_layer.table_dir().to_string();
587            let path_type = access_layer.path_type();
588            let object_store = access_layer.object_store().clone();
589            let puffin_factory = access_layer.puffin_manager_factory().clone();
590            let path_factory = RegionFilePathFactory::new(table_dir, path_type);
591
592            let entry_futures = manifest_entries.into_iter().map(|entry| {
593                let object_store = object_store.clone();
594                let path_factory = path_factory.clone();
595                let puffin_factory = puffin_factory.clone();
596                let puffin_metadata_cache = puffin_metadata_cache.clone();
597                let bloom_filter_cache = bloom_filter_cache.clone();
598                let inverted_index_cache = inverted_index_cache.clone();
599
600                async move {
601                    let Some(index_file_path) = entry.index_file_path.as_ref() else {
602                        return Vec::new();
603                    };
604
605                    let index_version = entry.index_version;
606                    let file_id = match FileId::parse_str(&entry.file_id) {
607                        Ok(file_id) => file_id,
608                        Err(err) => {
609                            warn!(
610                                err;
611                                "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
612                                entry.table_dir,
613                                entry.file_id
614                            );
615                            return Vec::new();
616                        }
617                    };
618                    let region_index_id = RegionIndexId::new(
619                        RegionFileId::new(entry.region_id, file_id),
620                        index_version,
621                    );
622                    let context = IndexEntryContext {
623                        table_dir: &entry.table_dir,
624                        index_file_path: index_file_path.as_str(),
625                        region_id: entry.region_id,
626                        table_id: entry.table_id,
627                        region_number: entry.region_number,
628                        region_group: entry.region_group,
629                        region_sequence: entry.region_sequence,
630                        file_id: &entry.file_id,
631                        index_file_size: entry.index_file_size,
632                        node_id,
633                    };
634
635                    let manager = puffin_factory
636                        .build(object_store, path_factory)
637                        .with_puffin_metadata_cache(puffin_metadata_cache);
638
639                    collect_index_entries_from_puffin(
640                        manager,
641                        region_index_id,
642                        context,
643                        bloom_filter_cache,
644                        inverted_index_cache,
645                    )
646                    .await
647                }
648            });
649
650            let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
651            while let Some(mut metas) = meta_stream.next().await {
652                results.append(&mut metas);
653            }
654        }
655
656        results
657    }
658
659    /// Lists all SSTs from the storage layer of all regions in the engine.
660    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
661        let node_id = self.inner.workers.file_ref_manager().node_id();
662        let regions = self.inner.workers.all_regions();
663
664        let mut layers_distinct_table_dirs = HashMap::new();
665        for region in regions {
666            let table_dir = region.access_layer.table_dir();
667            if !layers_distinct_table_dirs.contains_key(table_dir) {
668                layers_distinct_table_dirs
669                    .insert(table_dir.to_string(), region.access_layer.clone());
670            }
671        }
672
673        stream::iter(layers_distinct_table_dirs)
674            .map(|(_, access_layer)| access_layer.storage_sst_entries())
675            .flatten()
676            .map(move |entry| {
677                entry.map(move |mut entry| {
678                    entry.node_id = node_id;
679                    entry
680                })
681            })
682    }
683}
684
685/// Check whether the region edit is valid.
686///
687/// Only adding or removing files to region is considered valid now.
688fn is_valid_region_edit(edit: &RegionEdit) -> bool {
689    (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
690        && matches!(
691            edit,
692            RegionEdit {
693                files_to_add: _,
694                files_to_remove: _,
695                timestamp_ms: _,
696                compaction_time_window: None,
697                flushed_entry_id: None,
698                flushed_sequence: None,
699                ..
700            }
701        )
702}
703
704/// Inner struct of [MitoEngine].
705struct EngineInner {
706    /// Region workers group.
707    workers: WorkerGroup,
708    /// Config of the engine.
709    config: Arc<MitoConfig>,
710    /// The Wal raw entry reader.
711    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
712    /// Memory tracker for table scans.
713    scan_memory_tracker: QueryMemoryTracker,
714    #[cfg(feature = "enterprise")]
715    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
716}
717
718type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
719
720/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
721fn prepare_batch_open_requests(
722    requests: Vec<(RegionId, RegionOpenRequest)>,
723) -> Result<(
724    TopicGroupedRegionOpenRequests,
725    Vec<(RegionId, RegionOpenRequest)>,
726)> {
727    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
728    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
729    for (region_id, request) in requests {
730        match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
731            WalOptions::Kafka(options) => {
732                topic_to_regions
733                    .entry(options.topic)
734                    .or_default()
735                    .push((region_id, request));
736            }
737            WalOptions::RaftEngine | WalOptions::Noop => {
738                remaining_regions.push((region_id, request));
739            }
740        }
741    }
742
743    Ok((topic_to_regions, remaining_regions))
744}
745
746impl EngineInner {
747    #[cfg(feature = "enterprise")]
748    #[must_use]
749    fn with_extension_range_provider_factory(
750        self,
751        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
752    ) -> Self {
753        Self {
754            extension_range_provider_factory,
755            ..self
756        }
757    }
758
759    /// Stop the inner engine.
760    async fn stop(&self) -> Result<()> {
761        self.workers.stop().await
762    }
763
764    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
765        self.workers
766            .get_region(region_id)
767            .context(RegionNotFoundSnafu { region_id })
768    }
769
770    /// Get metadata of a region.
771    ///
772    /// Returns error if the region doesn't exist.
773    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
774        // Reading a region doesn't need to go through the region worker thread.
775        let region = self.find_region(region_id)?;
776        Ok(region.metadata())
777    }
778
779    async fn open_topic_regions(
780        &self,
781        topic: String,
782        region_requests: Vec<(RegionId, RegionOpenRequest)>,
783    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
784        let now = Instant::now();
785        let region_ids = region_requests
786            .iter()
787            .map(|(region_id, _)| *region_id)
788            .collect::<Vec<_>>();
789        let provider = Provider::kafka_provider(topic);
790        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
791            provider.clone(),
792            self.wal_raw_entry_reader.clone(),
793            &region_ids,
794            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
795        );
796
797        let mut responses = Vec::with_capacity(region_requests.len());
798        for ((region_id, request), entry_receiver) in
799            region_requests.into_iter().zip(entry_receivers)
800        {
801            let (request, receiver) =
802                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
803            self.workers.submit_to_worker(region_id, request).await?;
804            responses.push(async move { receiver.await.context(RecvSnafu)? });
805        }
806
807        // Waits for entries distribution.
808        let distribution =
809            common_runtime::spawn_global(async move { distributor.distribute().await });
810        // Waits for worker returns.
811        let responses = join_all(responses).await;
812        distribution.await.context(JoinSnafu)??;
813
814        let num_failure = responses.iter().filter(|r| r.is_err()).count();
815        info!(
816            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
817            region_ids.len() - num_failure,
818            // Safety: provider is kafka provider.
819            provider.as_kafka_provider().unwrap(),
820            num_failure,
821            now.elapsed(),
822        );
823        Ok(region_ids.into_iter().zip(responses).collect())
824    }
825
826    async fn handle_batch_open_requests(
827        &self,
828        parallelism: usize,
829        requests: Vec<(RegionId, RegionOpenRequest)>,
830    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
831        let semaphore = Arc::new(Semaphore::new(parallelism));
832        let (topic_to_region_requests, remaining_region_requests) =
833            prepare_batch_open_requests(requests)?;
834        let mut responses =
835            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
836
837        if !topic_to_region_requests.is_empty() {
838            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
839            for (topic, region_requests) in topic_to_region_requests {
840                let semaphore_moved = semaphore.clone();
841                tasks.push(async move {
842                    // Safety: semaphore must exist
843                    let _permit = semaphore_moved.acquire().await.unwrap();
844                    self.open_topic_regions(topic, region_requests).await
845                })
846            }
847            let r = try_join_all(tasks).await?;
848            responses.extend(r.into_iter().flatten());
849        }
850
851        if !remaining_region_requests.is_empty() {
852            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
853            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
854            for (region_id, request) in remaining_region_requests {
855                let semaphore_moved = semaphore.clone();
856                region_ids.push(region_id);
857                tasks.push(async move {
858                    // Safety: semaphore must exist
859                    let _permit = semaphore_moved.acquire().await.unwrap();
860                    let (request, receiver) =
861                        WorkerRequest::new_open_region_request(region_id, request, None);
862
863                    self.workers.submit_to_worker(region_id, request).await?;
864
865                    receiver.await.context(RecvSnafu)?
866                })
867            }
868
869            let results = join_all(tasks).await;
870            responses.extend(region_ids.into_iter().zip(results));
871        }
872
873        Ok(responses)
874    }
875
876    async fn catchup_topic_regions(
877        &self,
878        provider: Provider,
879        region_requests: Vec<(RegionId, RegionCatchupRequest)>,
880    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
881        let now = Instant::now();
882        let region_ids = region_requests
883            .iter()
884            .map(|(region_id, _)| *region_id)
885            .collect::<Vec<_>>();
886        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
887            provider.clone(),
888            self.wal_raw_entry_reader.clone(),
889            &region_ids,
890            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
891        );
892
893        let mut responses = Vec::with_capacity(region_requests.len());
894        for ((region_id, request), entry_receiver) in
895            region_requests.into_iter().zip(entry_receivers)
896        {
897            let (request, receiver) =
898                WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
899            self.workers.submit_to_worker(region_id, request).await?;
900            responses.push(async move { receiver.await.context(RecvSnafu)? });
901        }
902
903        // Wait for entries distribution.
904        let distribution =
905            common_runtime::spawn_global(async move { distributor.distribute().await });
906        // Wait for worker returns.
907        let responses = join_all(responses).await;
908        distribution.await.context(JoinSnafu)??;
909
910        let num_failure = responses.iter().filter(|r| r.is_err()).count();
911        info!(
912            "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
913            region_ids.len() - num_failure,
914            // Safety: provider is kafka provider.
915            provider.as_kafka_provider().unwrap(),
916            num_failure,
917            now.elapsed(),
918        );
919
920        Ok(region_ids.into_iter().zip(responses).collect())
921    }
922
923    async fn handle_batch_catchup_requests(
924        &self,
925        parallelism: usize,
926        requests: Vec<(RegionId, RegionCatchupRequest)>,
927    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
928        let mut responses = Vec::with_capacity(requests.len());
929        let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
930        let mut remaining_region_requests = vec![];
931
932        for (region_id, request) in requests {
933            match self.workers.get_region(region_id) {
934                Some(region) => match region.provider.as_kafka_provider() {
935                    Some(provider) => {
936                        topic_regions
937                            .entry(provider.clone())
938                            .or_default()
939                            .push((region_id, request));
940                    }
941                    None => {
942                        remaining_region_requests.push((region_id, request));
943                    }
944                },
945                None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
946            }
947        }
948
949        let semaphore = Arc::new(Semaphore::new(parallelism));
950
951        if !topic_regions.is_empty() {
952            let mut tasks = Vec::with_capacity(topic_regions.len());
953            for (provider, region_requests) in topic_regions {
954                let semaphore_moved = semaphore.clone();
955                tasks.push(async move {
956                    // Safety: semaphore must exist
957                    let _permit = semaphore_moved.acquire().await.unwrap();
958                    self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
959                        .await
960                })
961            }
962
963            let r = try_join_all(tasks).await?;
964            responses.extend(r.into_iter().flatten());
965        }
966
967        if !remaining_region_requests.is_empty() {
968            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
969            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
970            for (region_id, request) in remaining_region_requests {
971                let semaphore_moved = semaphore.clone();
972                region_ids.push(region_id);
973                tasks.push(async move {
974                    // Safety: semaphore must exist
975                    let _permit = semaphore_moved.acquire().await.unwrap();
976                    let (request, receiver) =
977                        WorkerRequest::new_catchup_region_request(region_id, request, None);
978
979                    self.workers.submit_to_worker(region_id, request).await?;
980
981                    receiver.await.context(RecvSnafu)?
982                })
983            }
984
985            let results = join_all(tasks).await;
986            responses.extend(region_ids.into_iter().zip(results));
987        }
988
989        Ok(responses)
990    }
991
992    /// Handles [RegionRequest] and return its executed result.
993    async fn handle_request(
994        &self,
995        region_id: RegionId,
996        request: RegionRequest,
997    ) -> Result<AffectedRows> {
998        let region_metadata = self.get_metadata(region_id).ok();
999        let (request, receiver) =
1000            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
1001        self.workers.submit_to_worker(region_id, request).await?;
1002
1003        receiver.await.context(RecvSnafu)?
1004    }
1005
1006    /// Returns the sequence of latest committed data.
1007    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
1008        // Reading a region doesn't need to go through the region worker thread.
1009        self.find_region(region_id)
1010            .map(|r| r.find_committed_sequence())
1011    }
1012
1013    /// Handles the scan `request` and returns a [ScanRegion].
1014    #[tracing::instrument(skip_all, fields(region_id = %region_id))]
1015    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
1016        let query_start = Instant::now();
1017        // Reading a region doesn't need to go through the region worker thread.
1018        let region = self.find_region(region_id)?;
1019        let version = region.version();
1020        // Get cache.
1021        let cache_manager = self.workers.cache_manager();
1022
1023        let scan_region = ScanRegion::new(
1024            version,
1025            region.access_layer.clone(),
1026            request,
1027            CacheStrategy::EnableAll(cache_manager),
1028        )
1029        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
1030        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
1031        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
1032        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
1033        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
1034        .with_start_time(query_start);
1035
1036        #[cfg(feature = "enterprise")]
1037        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
1038
1039        Ok(scan_region)
1040    }
1041
1042    #[cfg(feature = "enterprise")]
1043    fn maybe_fill_extension_range_provider(
1044        &self,
1045        mut scan_region: ScanRegion,
1046        region: MitoRegionRef,
1047    ) -> ScanRegion {
1048        if region.is_follower()
1049            && let Some(factory) = self.extension_range_provider_factory.as_ref()
1050        {
1051            scan_region
1052                .set_extension_range_provider(factory.create_extension_range_provider(region));
1053        }
1054        scan_region
1055    }
1056
1057    /// Converts the [`RegionRole`].
1058    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1059        let region = self.find_region(region_id)?;
1060        region.set_role(role);
1061        Ok(())
1062    }
1063
1064    /// Sets read-only for a region and ensures no more writes in the region after it returns.
1065    async fn set_region_role_state_gracefully(
1066        &self,
1067        region_id: RegionId,
1068        region_role_state: SettableRegionRoleState,
1069    ) -> Result<SetRegionRoleStateResponse> {
1070        // Notes: It acquires the mutable ownership to ensure no other threads,
1071        // Therefore, we submit it to the worker.
1072        let (request, receiver) =
1073            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1074        self.workers.submit_to_worker(region_id, request).await?;
1075
1076        receiver.await.context(RecvSnafu)
1077    }
1078
1079    async fn sync_region(
1080        &self,
1081        region_id: RegionId,
1082        manifest_info: RegionManifestInfo,
1083    ) -> Result<(ManifestVersion, bool)> {
1084        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1085        let manifest_version = manifest_info.data_manifest_version();
1086        let (request, receiver) =
1087            WorkerRequest::new_sync_region_request(region_id, manifest_version);
1088        self.workers.submit_to_worker(region_id, request).await?;
1089
1090        receiver.await.context(RecvSnafu)?
1091    }
1092
1093    async fn remap_manifests(
1094        &self,
1095        request: RemapManifestsRequest,
1096    ) -> Result<RemapManifestsResponse> {
1097        let region_id = request.region_id;
1098        let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1099        self.workers.submit_to_worker(region_id, request).await?;
1100        let manifest_paths = receiver.await.context(RecvSnafu)??;
1101        Ok(RemapManifestsResponse { manifest_paths })
1102    }
1103
1104    async fn copy_region_from(
1105        &self,
1106        region_id: RegionId,
1107        request: MitoCopyRegionFromRequest,
1108    ) -> Result<MitoCopyRegionFromResponse> {
1109        let (request, receiver) =
1110            WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
1111        self.workers.submit_to_worker(region_id, request).await?;
1112        let response = receiver.await.context(RecvSnafu)??;
1113        Ok(response)
1114    }
1115
1116    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1117        self.workers.get_region(region_id).map(|region| {
1118            if region.is_follower() {
1119                RegionRole::Follower
1120            } else {
1121                RegionRole::Leader
1122            }
1123        })
1124    }
1125}
1126
1127fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
1128    responses
1129        .into_iter()
1130        .map(|(region_id, response)| {
1131            (
1132                region_id,
1133                response.map(RegionResponse::new).map_err(BoxedError::new),
1134            )
1135        })
1136        .collect()
1137}
1138
1139#[async_trait]
1140impl RegionEngine for MitoEngine {
1141    fn name(&self) -> &str {
1142        MITO_ENGINE_NAME
1143    }
1144
1145    #[tracing::instrument(skip_all)]
1146    async fn handle_batch_open_requests(
1147        &self,
1148        parallelism: usize,
1149        requests: Vec<(RegionId, RegionOpenRequest)>,
1150    ) -> Result<BatchResponses, BoxedError> {
1151        // TODO(weny): add metrics.
1152        self.inner
1153            .handle_batch_open_requests(parallelism, requests)
1154            .await
1155            .map(map_batch_responses)
1156            .map_err(BoxedError::new)
1157    }
1158
1159    #[tracing::instrument(skip_all)]
1160    async fn handle_batch_catchup_requests(
1161        &self,
1162        parallelism: usize,
1163        requests: Vec<(RegionId, RegionCatchupRequest)>,
1164    ) -> Result<BatchResponses, BoxedError> {
1165        self.inner
1166            .handle_batch_catchup_requests(parallelism, requests)
1167            .await
1168            .map(map_batch_responses)
1169            .map_err(BoxedError::new)
1170    }
1171
1172    #[tracing::instrument(skip_all)]
1173    async fn handle_request(
1174        &self,
1175        region_id: RegionId,
1176        request: RegionRequest,
1177    ) -> Result<RegionResponse, BoxedError> {
1178        let _timer = HANDLE_REQUEST_ELAPSED
1179            .with_label_values(&[request.request_type()])
1180            .start_timer();
1181
1182        let is_alter = matches!(request, RegionRequest::Alter(_));
1183        let is_create = matches!(request, RegionRequest::Create(_));
1184        let mut response = self
1185            .inner
1186            .handle_request(region_id, request)
1187            .await
1188            .map(RegionResponse::new)
1189            .map_err(BoxedError::new)?;
1190
1191        if is_alter {
1192            self.handle_alter_response(region_id, &mut response)
1193                .map_err(BoxedError::new)?;
1194        } else if is_create {
1195            self.handle_create_response(region_id, &mut response)
1196                .map_err(BoxedError::new)?;
1197        }
1198
1199        Ok(response)
1200    }
1201
1202    #[tracing::instrument(skip_all)]
1203    async fn handle_query(
1204        &self,
1205        region_id: RegionId,
1206        request: ScanRequest,
1207    ) -> Result<RegionScannerRef, BoxedError> {
1208        self.scan_region(region_id, request)
1209            .map_err(BoxedError::new)?
1210            .region_scanner()
1211            .await
1212            .map_err(BoxedError::new)
1213    }
1214
1215    fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
1216        Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
1217    }
1218
1219    async fn get_committed_sequence(
1220        &self,
1221        region_id: RegionId,
1222    ) -> Result<SequenceNumber, BoxedError> {
1223        self.inner
1224            .get_committed_sequence(region_id)
1225            .map_err(BoxedError::new)
1226    }
1227
1228    /// Retrieve region's metadata.
1229    async fn get_metadata(
1230        &self,
1231        region_id: RegionId,
1232    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1233        self.inner.get_metadata(region_id).map_err(BoxedError::new)
1234    }
1235
1236    /// Stop the engine.
1237    ///
1238    /// Stopping the engine doesn't stop the underlying log store as other components might
1239    /// still use it. (When no other components are referencing the log store, it will
1240    /// automatically shutdown.)
1241    async fn stop(&self) -> std::result::Result<(), BoxedError> {
1242        self.inner.stop().await.map_err(BoxedError::new)
1243    }
1244
1245    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1246        self.get_region_statistic(region_id)
1247    }
1248
1249    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1250        self.inner
1251            .set_region_role(region_id, role)
1252            .map_err(BoxedError::new)
1253    }
1254
1255    async fn set_region_role_state_gracefully(
1256        &self,
1257        region_id: RegionId,
1258        region_role_state: SettableRegionRoleState,
1259    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1260        let _timer = HANDLE_REQUEST_ELAPSED
1261            .with_label_values(&["set_region_role_state_gracefully"])
1262            .start_timer();
1263
1264        self.inner
1265            .set_region_role_state_gracefully(region_id, region_role_state)
1266            .await
1267            .map_err(BoxedError::new)
1268    }
1269
1270    async fn sync_region(
1271        &self,
1272        region_id: RegionId,
1273        request: SyncRegionFromRequest,
1274    ) -> Result<SyncRegionFromResponse, BoxedError> {
1275        let manifest_info = request
1276            .into_region_manifest_info()
1277            .context(UnexpectedSnafu {
1278                err_msg: "Expected a manifest info request",
1279            })
1280            .map_err(BoxedError::new)?;
1281        let (_, synced) = self
1282            .inner
1283            .sync_region(region_id, manifest_info)
1284            .await
1285            .map_err(BoxedError::new)?;
1286
1287        Ok(SyncRegionFromResponse::Mito { synced })
1288    }
1289
1290    async fn remap_manifests(
1291        &self,
1292        request: RemapManifestsRequest,
1293    ) -> Result<RemapManifestsResponse, BoxedError> {
1294        self.inner
1295            .remap_manifests(request)
1296            .await
1297            .map_err(BoxedError::new)
1298    }
1299
1300    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1301        self.inner.role(region_id)
1302    }
1303
1304    fn as_any(&self) -> &dyn Any {
1305        self
1306    }
1307}
1308
1309impl MitoEngine {
1310    fn handle_alter_response(
1311        &self,
1312        region_id: RegionId,
1313        response: &mut RegionResponse,
1314    ) -> Result<()> {
1315        if let Some(statistic) = self.region_statistic(region_id) {
1316            Self::encode_manifest_info_to_extensions(
1317                &region_id,
1318                statistic.manifest,
1319                &mut response.extensions,
1320            )?;
1321        }
1322        let column_metadatas = self
1323            .inner
1324            .find_region(region_id)
1325            .ok()
1326            .map(|r| r.metadata().column_metadatas.clone());
1327        if let Some(column_metadatas) = column_metadatas {
1328            Self::encode_column_metadatas_to_extensions(
1329                &region_id,
1330                column_metadatas,
1331                &mut response.extensions,
1332            )?;
1333        }
1334        Ok(())
1335    }
1336
1337    fn handle_create_response(
1338        &self,
1339        region_id: RegionId,
1340        response: &mut RegionResponse,
1341    ) -> Result<()> {
1342        let column_metadatas = self
1343            .inner
1344            .find_region(region_id)
1345            .ok()
1346            .map(|r| r.metadata().column_metadatas.clone());
1347        if let Some(column_metadatas) = column_metadatas {
1348            Self::encode_column_metadatas_to_extensions(
1349                &region_id,
1350                column_metadatas,
1351                &mut response.extensions,
1352            )?;
1353        }
1354        Ok(())
1355    }
1356}
1357
1358// Tests methods.
1359#[cfg(any(test, feature = "test"))]
1360#[allow(clippy::too_many_arguments)]
1361impl MitoEngine {
1362    /// Returns a new [MitoEngine] for tests.
1363    pub async fn new_for_test<S: LogStore>(
1364        data_home: &str,
1365        mut config: MitoConfig,
1366        log_store: Arc<S>,
1367        object_store_manager: ObjectStoreManagerRef,
1368        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1369        listener: Option<crate::engine::listener::EventListenerRef>,
1370        time_provider: crate::time_provider::TimeProviderRef,
1371        schema_metadata_manager: SchemaMetadataManagerRef,
1372        file_ref_manager: FileReferenceManagerRef,
1373        partition_expr_fetcher: PartitionExprFetcherRef,
1374    ) -> Result<MitoEngine> {
1375        config.sanitize(data_home)?;
1376
1377        let config = Arc::new(config);
1378        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1379        let total_memory = get_total_memory_bytes().max(0) as u64;
1380        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1381        let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
1382            .with_on_update(|usage| {
1383                SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1384            })
1385            .with_on_reject(|| {
1386                SCAN_REQUESTS_REJECTED_TOTAL.inc();
1387            });
1388        Ok(MitoEngine {
1389            inner: Arc::new(EngineInner {
1390                workers: WorkerGroup::start_for_test(
1391                    config.clone(),
1392                    log_store,
1393                    object_store_manager,
1394                    write_buffer_manager,
1395                    listener,
1396                    schema_metadata_manager,
1397                    file_ref_manager,
1398                    time_provider,
1399                    partition_expr_fetcher,
1400                )
1401                .await?,
1402                config,
1403                wal_raw_entry_reader,
1404                scan_memory_tracker,
1405                #[cfg(feature = "enterprise")]
1406                extension_range_provider_factory: None,
1407            }),
1408        })
1409    }
1410
1411    /// Returns the purge scheduler.
1412    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1413        self.inner.workers.purge_scheduler()
1414    }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419    use std::time::Duration;
1420
1421    use super::*;
1422    use crate::sst::file::FileMeta;
1423
1424    #[test]
1425    fn test_is_valid_region_edit() {
1426        // Valid: has only "files_to_add"
1427        let edit = RegionEdit {
1428            files_to_add: vec![FileMeta::default()],
1429            files_to_remove: vec![],
1430            timestamp_ms: None,
1431            compaction_time_window: None,
1432            flushed_entry_id: None,
1433            flushed_sequence: None,
1434            committed_sequence: None,
1435        };
1436        assert!(is_valid_region_edit(&edit));
1437
1438        // Invalid: "files_to_add" and "files_to_remove" are both empty
1439        let edit = RegionEdit {
1440            files_to_add: vec![],
1441            files_to_remove: vec![],
1442            timestamp_ms: None,
1443            compaction_time_window: None,
1444            flushed_entry_id: None,
1445            flushed_sequence: None,
1446            committed_sequence: None,
1447        };
1448        assert!(!is_valid_region_edit(&edit));
1449
1450        // Valid: "files_to_remove" is not empty
1451        let edit = RegionEdit {
1452            files_to_add: vec![FileMeta::default()],
1453            files_to_remove: vec![FileMeta::default()],
1454            timestamp_ms: None,
1455            compaction_time_window: None,
1456            flushed_entry_id: None,
1457            flushed_sequence: None,
1458            committed_sequence: None,
1459        };
1460        assert!(is_valid_region_edit(&edit));
1461
1462        // Invalid: other fields are not all "None"s
1463        let edit = RegionEdit {
1464            files_to_add: vec![FileMeta::default()],
1465            files_to_remove: vec![],
1466            timestamp_ms: None,
1467            compaction_time_window: Some(Duration::from_secs(1)),
1468            flushed_entry_id: None,
1469            flushed_sequence: None,
1470            committed_sequence: None,
1471        };
1472        assert!(!is_valid_region_edit(&edit));
1473        let edit = RegionEdit {
1474            files_to_add: vec![FileMeta::default()],
1475            files_to_remove: vec![],
1476            timestamp_ms: None,
1477            compaction_time_window: None,
1478            flushed_entry_id: Some(1),
1479            flushed_sequence: None,
1480            committed_sequence: None,
1481        };
1482        assert!(!is_valid_region_edit(&edit));
1483        let edit = RegionEdit {
1484            files_to_add: vec![FileMeta::default()],
1485            files_to_remove: vec![],
1486            timestamp_ms: None,
1487            compaction_time_window: None,
1488            flushed_entry_id: None,
1489            flushed_sequence: Some(1),
1490            committed_sequence: None,
1491        };
1492        assert!(!is_valid_region_edit(&edit));
1493    }
1494}