mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod staging_test;
69#[cfg(test)]
70mod sync_test;
71#[cfg(test)]
72mod truncate_test;
73
74#[cfg(test)]
75mod remap_manifests_test;
76
77mod puffin_index;
78
79use std::any::Any;
80use std::collections::HashMap;
81use std::sync::Arc;
82use std::time::Instant;
83
84use api::region::RegionResponse;
85use async_trait::async_trait;
86use common_base::Plugins;
87use common_error::ext::BoxedError;
88use common_meta::key::SchemaMetadataManagerRef;
89use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
90use common_stat::get_total_memory_bytes;
91use common_telemetry::{info, tracing, warn};
92use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
93use futures::future::{join_all, try_join_all};
94use futures::stream::{self, Stream, StreamExt};
95use object_store::manager::ObjectStoreManagerRef;
96use snafu::{OptionExt, ResultExt, ensure};
97use store_api::ManifestVersion;
98use store_api::codec::PrimaryKeyEncoding;
99use store_api::logstore::LogStore;
100use store_api::logstore::provider::{KafkaProvider, Provider};
101use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
102use store_api::metric_engine_consts::{
103    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
104};
105use store_api::region_engine::{
106    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
107    RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
108    SettableRegionRoleState, SyncManifestResponse,
109};
110use store_api::region_request::{
111    AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
112};
113use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
114use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
115use tokio::sync::{Semaphore, oneshot};
116
117use crate::access_layer::RegionFilePathFactory;
118use crate::cache::{CacheManagerRef, CacheStrategy};
119use crate::config::MitoConfig;
120use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
121use crate::error::{
122    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
123    SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
124};
125#[cfg(feature = "enterprise")]
126use crate::extension::BoxedExtensionRangeProviderFactory;
127use crate::gc::GcLimiterRef;
128use crate::manifest::action::RegionEdit;
129use crate::memtable::MemtableStats;
130use crate::metrics::{
131    HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
132};
133use crate::read::scan_region::{ScanRegion, Scanner};
134use crate::read::stream::ScanBatchStream;
135use crate::region::MitoRegionRef;
136use crate::region::opener::PartitionExprFetcherRef;
137use crate::request::{RegionEditRequest, WorkerRequest};
138use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
139use crate::sst::file_ref::FileReferenceManagerRef;
140use crate::wal::entry_distributor::{
141    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
142};
143use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
144use crate::worker::WorkerGroup;
145
146pub const MITO_ENGINE_NAME: &str = "mito";
147
148pub struct MitoEngineBuilder<'a, S: LogStore> {
149    data_home: &'a str,
150    config: MitoConfig,
151    log_store: Arc<S>,
152    object_store_manager: ObjectStoreManagerRef,
153    schema_metadata_manager: SchemaMetadataManagerRef,
154    file_ref_manager: FileReferenceManagerRef,
155    partition_expr_fetcher: PartitionExprFetcherRef,
156    plugins: Plugins,
157    max_concurrent_queries: usize,
158    #[cfg(feature = "enterprise")]
159    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
160}
161
162impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
163    #[allow(clippy::too_many_arguments)]
164    pub fn new(
165        data_home: &'a str,
166        config: MitoConfig,
167        log_store: Arc<S>,
168        object_store_manager: ObjectStoreManagerRef,
169        schema_metadata_manager: SchemaMetadataManagerRef,
170        file_ref_manager: FileReferenceManagerRef,
171        partition_expr_fetcher: PartitionExprFetcherRef,
172        plugins: Plugins,
173        max_concurrent_queries: usize,
174    ) -> Self {
175        Self {
176            data_home,
177            config,
178            log_store,
179            object_store_manager,
180            schema_metadata_manager,
181            file_ref_manager,
182            plugins,
183            partition_expr_fetcher,
184            max_concurrent_queries,
185            #[cfg(feature = "enterprise")]
186            extension_range_provider_factory: None,
187        }
188    }
189
190    #[cfg(feature = "enterprise")]
191    #[must_use]
192    pub fn with_extension_range_provider_factory(
193        self,
194        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
195    ) -> Self {
196        Self {
197            extension_range_provider_factory,
198            ..self
199        }
200    }
201
202    pub async fn try_build(mut self) -> Result<MitoEngine> {
203        self.config.sanitize(self.data_home)?;
204
205        let config = Arc::new(self.config);
206        let workers = WorkerGroup::start(
207            config.clone(),
208            self.log_store.clone(),
209            self.object_store_manager,
210            self.schema_metadata_manager,
211            self.file_ref_manager,
212            self.partition_expr_fetcher.clone(),
213            self.plugins,
214        )
215        .await?;
216        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
217        let total_memory = get_total_memory_bytes().max(0) as u64;
218        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
219        let scan_memory_tracker =
220            QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
221                .with_on_update(|usage| {
222                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
223                })
224                .with_on_reject(|| {
225                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
226                });
227
228        let inner = EngineInner {
229            workers,
230            config,
231            wal_raw_entry_reader,
232            scan_memory_tracker,
233            #[cfg(feature = "enterprise")]
234            extension_range_provider_factory: None,
235        };
236
237        #[cfg(feature = "enterprise")]
238        let inner =
239            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
240
241        Ok(MitoEngine {
242            inner: Arc::new(inner),
243        })
244    }
245}
246
247/// Region engine implementation for timeseries data.
248#[derive(Clone)]
249pub struct MitoEngine {
250    inner: Arc<EngineInner>,
251}
252
253impl MitoEngine {
254    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
255    #[allow(clippy::too_many_arguments)]
256    pub async fn new<S: LogStore>(
257        data_home: &str,
258        config: MitoConfig,
259        log_store: Arc<S>,
260        object_store_manager: ObjectStoreManagerRef,
261        schema_metadata_manager: SchemaMetadataManagerRef,
262        file_ref_manager: FileReferenceManagerRef,
263        partition_expr_fetcher: PartitionExprFetcherRef,
264        plugins: Plugins,
265    ) -> Result<MitoEngine> {
266        let builder = MitoEngineBuilder::new(
267            data_home,
268            config,
269            log_store,
270            object_store_manager,
271            schema_metadata_manager,
272            file_ref_manager,
273            partition_expr_fetcher,
274            plugins,
275            0, // Default: no limit on concurrent queries
276        );
277        builder.try_build().await
278    }
279
280    pub fn mito_config(&self) -> &MitoConfig {
281        &self.inner.config
282    }
283
284    pub fn cache_manager(&self) -> CacheManagerRef {
285        self.inner.workers.cache_manager()
286    }
287
288    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
289        self.inner.workers.file_ref_manager()
290    }
291
292    pub fn gc_limiter(&self) -> GcLimiterRef {
293        self.inner.workers.gc_limiter()
294    }
295
296    /// Get all tmp ref files for given region ids, excluding files that's already in manifest.
297    pub async fn get_snapshot_of_file_refs(
298        &self,
299        file_handle_regions: impl IntoIterator<Item = RegionId>,
300        manifest_regions: HashMap<RegionId, Vec<RegionId>>,
301    ) -> Result<FileRefsManifest> {
302        let file_ref_mgr = self.file_ref_manager();
303
304        let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
305        // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
306        // as regions on other datanodes are not managed by this engine.
307        let query_regions: Vec<MitoRegionRef> = file_handle_regions
308            .into_iter()
309            .filter_map(|region_id| self.find_region(region_id))
310            .collect();
311
312        let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
313            .into_iter()
314            .filter_map(|(related_region, queries)| {
315                self.find_region(related_region).map(|r| (r, queries))
316            })
317            .collect();
318
319        file_ref_mgr
320            .get_snapshot_of_file_refs(query_regions, related_regions)
321            .await
322    }
323
324    /// Returns true if the specific region exists.
325    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
326        self.inner.workers.is_region_exists(region_id)
327    }
328
329    /// Returns true if the specific region exists.
330    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
331        self.inner.workers.is_region_opening(region_id)
332    }
333
334    /// Returns true if the specific region is catching up.
335    pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
336        self.inner.workers.is_region_catching_up(region_id)
337    }
338
339    /// Returns the region disk/memory statistic.
340    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
341        self.find_region(region_id)
342            .map(|region| region.region_statistic())
343    }
344
345    /// Returns primary key encoding of the region.
346    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
347        self.find_region(region_id)
348            .map(|r| r.primary_key_encoding())
349    }
350
351    /// Handle substrait query and return a stream of record batches
352    ///
353    /// Notice that the output stream's ordering is not guranateed. If order
354    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
355    #[tracing::instrument(skip_all)]
356    pub async fn scan_to_stream(
357        &self,
358        region_id: RegionId,
359        request: ScanRequest,
360    ) -> Result<SendableRecordBatchStream, BoxedError> {
361        self.scanner(region_id, request)
362            .await
363            .map_err(BoxedError::new)?
364            .scan()
365            .await
366    }
367
368    /// Scan [`Batch`]es by [`ScanRequest`].
369    pub async fn scan_batch(
370        &self,
371        region_id: RegionId,
372        request: ScanRequest,
373        filter_deleted: bool,
374    ) -> Result<ScanBatchStream> {
375        let mut scan_region = self.scan_region(region_id, request)?;
376        scan_region.set_filter_deleted(filter_deleted);
377        scan_region.scanner().await?.scan_batch()
378    }
379
380    /// Returns a scanner to scan for `request`.
381    pub(crate) async fn scanner(
382        &self,
383        region_id: RegionId,
384        request: ScanRequest,
385    ) -> Result<Scanner> {
386        self.scan_region(region_id, request)?.scanner().await
387    }
388
389    /// Scans a region.
390    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
391        self.inner.scan_region(region_id, request)
392    }
393
394    /// Edit region's metadata by [RegionEdit] directly. Use with care.
395    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
396    /// Other region editing intention will result in an "invalid request" error.
397    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
398    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
399        let _timer = HANDLE_REQUEST_ELAPSED
400            .with_label_values(&["edit_region"])
401            .start_timer();
402
403        ensure!(
404            is_valid_region_edit(&edit),
405            InvalidRequestSnafu {
406                region_id,
407                reason: "invalid region edit"
408            }
409        );
410
411        let (tx, rx) = oneshot::channel();
412        let request = WorkerRequest::EditRegion(RegionEditRequest {
413            region_id,
414            edit,
415            tx,
416        });
417        self.inner
418            .workers
419            .submit_to_worker(region_id, request)
420            .await?;
421        rx.await.context(RecvSnafu)?
422    }
423
424    #[cfg(test)]
425    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
426        self.find_region(id)
427    }
428
429    pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
430        self.inner.workers.get_region(region_id)
431    }
432
433    fn encode_manifest_info_to_extensions(
434        region_id: &RegionId,
435        manifest_info: RegionManifestInfo,
436        extensions: &mut HashMap<String, Vec<u8>>,
437    ) -> Result<()> {
438        let region_manifest_info = vec![(*region_id, manifest_info)];
439
440        extensions.insert(
441            MANIFEST_INFO_EXTENSION_KEY.to_string(),
442            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
443        );
444        info!(
445            "Added manifest info: {:?} to extensions, region_id: {:?}",
446            region_manifest_info, region_id
447        );
448        Ok(())
449    }
450
451    fn encode_column_metadatas_to_extensions(
452        region_id: &RegionId,
453        column_metadatas: Vec<ColumnMetadata>,
454        extensions: &mut HashMap<String, Vec<u8>>,
455    ) -> Result<()> {
456        extensions.insert(
457            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
458            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
459        );
460        info!(
461            "Added column metadatas: {:?} to extensions, region_id: {:?}",
462            column_metadatas, region_id
463        );
464        Ok(())
465    }
466
467    /// Find the current version's memtables and SSTs stats by region_id.
468    /// The stats must be collected in one place one time to ensure data consistency.
469    pub fn find_memtable_and_sst_stats(
470        &self,
471        region_id: RegionId,
472    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
473        let region = self
474            .find_region(region_id)
475            .context(RegionNotFoundSnafu { region_id })?;
476
477        let version = region.version();
478        let memtable_stats = version
479            .memtables
480            .list_memtables()
481            .iter()
482            .map(|x| x.stats())
483            .collect::<Vec<_>>();
484
485        let sst_stats = version
486            .ssts
487            .levels()
488            .iter()
489            .flat_map(|level| level.files().map(|x| x.meta_ref()))
490            .cloned()
491            .collect::<Vec<_>>();
492        Ok((memtable_stats, sst_stats))
493    }
494
495    /// Lists all SSTs from the manifest of all regions in the engine.
496    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
497        let node_id = self.inner.workers.file_ref_manager().node_id();
498        let regions = self.inner.workers.all_regions();
499
500        let mut results = Vec::new();
501        for region in regions {
502            let mut entries = region.manifest_sst_entries().await;
503            for e in &mut entries {
504                e.node_id = node_id;
505            }
506            results.extend(entries);
507        }
508
509        results
510    }
511
512    /// Lists metadata about all puffin index targets stored in the engine.
513    pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
514        let node_id = self.inner.workers.file_ref_manager().node_id();
515        let cache_manager = self.inner.workers.cache_manager();
516        let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
517        let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
518        let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
519
520        let mut results = Vec::new();
521
522        for region in self.inner.workers.all_regions() {
523            let manifest_entries = region.manifest_sst_entries().await;
524            let access_layer = region.access_layer.clone();
525            let table_dir = access_layer.table_dir().to_string();
526            let path_type = access_layer.path_type();
527            let object_store = access_layer.object_store().clone();
528            let puffin_factory = access_layer.puffin_manager_factory().clone();
529            let path_factory = RegionFilePathFactory::new(table_dir, path_type);
530
531            let entry_futures = manifest_entries.into_iter().map(|entry| {
532                let object_store = object_store.clone();
533                let path_factory = path_factory.clone();
534                let puffin_factory = puffin_factory.clone();
535                let puffin_metadata_cache = puffin_metadata_cache.clone();
536                let bloom_filter_cache = bloom_filter_cache.clone();
537                let inverted_index_cache = inverted_index_cache.clone();
538
539                async move {
540                    let Some(index_file_path) = entry.index_file_path.as_ref() else {
541                        return Vec::new();
542                    };
543
544                    let index_version = entry.index_version;
545                    let file_id = match FileId::parse_str(&entry.file_id) {
546                        Ok(file_id) => file_id,
547                        Err(err) => {
548                            warn!(
549                                err;
550                                "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
551                                entry.table_dir,
552                                entry.file_id
553                            );
554                            return Vec::new();
555                        }
556                    };
557                    let region_index_id = RegionIndexId::new(
558                        RegionFileId::new(entry.region_id, file_id),
559                        index_version,
560                    );
561                    let context = IndexEntryContext {
562                        table_dir: &entry.table_dir,
563                        index_file_path: index_file_path.as_str(),
564                        region_id: entry.region_id,
565                        table_id: entry.table_id,
566                        region_number: entry.region_number,
567                        region_group: entry.region_group,
568                        region_sequence: entry.region_sequence,
569                        file_id: &entry.file_id,
570                        index_file_size: entry.index_file_size,
571                        node_id,
572                    };
573
574                    let manager = puffin_factory
575                        .build(object_store, path_factory)
576                        .with_puffin_metadata_cache(puffin_metadata_cache);
577
578                    collect_index_entries_from_puffin(
579                        manager,
580                        region_index_id,
581                        context,
582                        bloom_filter_cache,
583                        inverted_index_cache,
584                    )
585                    .await
586                }
587            });
588
589            let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
590            while let Some(mut metas) = meta_stream.next().await {
591                results.append(&mut metas);
592            }
593        }
594
595        results
596    }
597
598    /// Lists all SSTs from the storage layer of all regions in the engine.
599    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
600        let node_id = self.inner.workers.file_ref_manager().node_id();
601        let regions = self.inner.workers.all_regions();
602
603        let mut layers_distinct_table_dirs = HashMap::new();
604        for region in regions {
605            let table_dir = region.access_layer.table_dir();
606            if !layers_distinct_table_dirs.contains_key(table_dir) {
607                layers_distinct_table_dirs
608                    .insert(table_dir.to_string(), region.access_layer.clone());
609            }
610        }
611
612        stream::iter(layers_distinct_table_dirs)
613            .map(|(_, access_layer)| access_layer.storage_sst_entries())
614            .flatten()
615            .map(move |entry| {
616                entry.map(move |mut entry| {
617                    entry.node_id = node_id;
618                    entry
619                })
620            })
621    }
622}
623
624/// Check whether the region edit is valid. Only adding files to region is considered valid now.
625fn is_valid_region_edit(edit: &RegionEdit) -> bool {
626    !edit.files_to_add.is_empty()
627        && edit.files_to_remove.is_empty()
628        && matches!(
629            edit,
630            RegionEdit {
631                files_to_add: _,
632                files_to_remove: _,
633                timestamp_ms: _,
634                compaction_time_window: None,
635                flushed_entry_id: None,
636                flushed_sequence: None,
637                ..
638            }
639        )
640}
641
642/// Inner struct of [MitoEngine].
643struct EngineInner {
644    /// Region workers group.
645    workers: WorkerGroup,
646    /// Config of the engine.
647    config: Arc<MitoConfig>,
648    /// The Wal raw entry reader.
649    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
650    /// Memory tracker for table scans.
651    scan_memory_tracker: QueryMemoryTracker,
652    #[cfg(feature = "enterprise")]
653    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
654}
655
656type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
657
658/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
659fn prepare_batch_open_requests(
660    requests: Vec<(RegionId, RegionOpenRequest)>,
661) -> Result<(
662    TopicGroupedRegionOpenRequests,
663    Vec<(RegionId, RegionOpenRequest)>,
664)> {
665    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
666    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
667    for (region_id, request) in requests {
668        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
669            serde_json::from_str(options).context(SerdeJsonSnafu)?
670        } else {
671            WalOptions::RaftEngine
672        };
673        match options {
674            WalOptions::Kafka(options) => {
675                topic_to_regions
676                    .entry(options.topic)
677                    .or_default()
678                    .push((region_id, request));
679            }
680            WalOptions::RaftEngine | WalOptions::Noop => {
681                remaining_regions.push((region_id, request));
682            }
683        }
684    }
685
686    Ok((topic_to_regions, remaining_regions))
687}
688
689impl EngineInner {
690    #[cfg(feature = "enterprise")]
691    #[must_use]
692    fn with_extension_range_provider_factory(
693        self,
694        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
695    ) -> Self {
696        Self {
697            extension_range_provider_factory,
698            ..self
699        }
700    }
701
702    /// Stop the inner engine.
703    async fn stop(&self) -> Result<()> {
704        self.workers.stop().await
705    }
706
707    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
708        self.workers
709            .get_region(region_id)
710            .context(RegionNotFoundSnafu { region_id })
711    }
712
713    /// Get metadata of a region.
714    ///
715    /// Returns error if the region doesn't exist.
716    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
717        // Reading a region doesn't need to go through the region worker thread.
718        let region = self.find_region(region_id)?;
719        Ok(region.metadata())
720    }
721
722    async fn open_topic_regions(
723        &self,
724        topic: String,
725        region_requests: Vec<(RegionId, RegionOpenRequest)>,
726    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
727        let now = Instant::now();
728        let region_ids = region_requests
729            .iter()
730            .map(|(region_id, _)| *region_id)
731            .collect::<Vec<_>>();
732        let provider = Provider::kafka_provider(topic);
733        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
734            provider.clone(),
735            self.wal_raw_entry_reader.clone(),
736            &region_ids,
737            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
738        );
739
740        let mut responses = Vec::with_capacity(region_requests.len());
741        for ((region_id, request), entry_receiver) in
742            region_requests.into_iter().zip(entry_receivers)
743        {
744            let (request, receiver) =
745                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
746            self.workers.submit_to_worker(region_id, request).await?;
747            responses.push(async move { receiver.await.context(RecvSnafu)? });
748        }
749
750        // Waits for entries distribution.
751        let distribution =
752            common_runtime::spawn_global(async move { distributor.distribute().await });
753        // Waits for worker returns.
754        let responses = join_all(responses).await;
755        distribution.await.context(JoinSnafu)??;
756
757        let num_failure = responses.iter().filter(|r| r.is_err()).count();
758        info!(
759            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
760            region_ids.len() - num_failure,
761            // Safety: provider is kafka provider.
762            provider.as_kafka_provider().unwrap(),
763            num_failure,
764            now.elapsed(),
765        );
766        Ok(region_ids.into_iter().zip(responses).collect())
767    }
768
769    async fn handle_batch_open_requests(
770        &self,
771        parallelism: usize,
772        requests: Vec<(RegionId, RegionOpenRequest)>,
773    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
774        let semaphore = Arc::new(Semaphore::new(parallelism));
775        let (topic_to_region_requests, remaining_region_requests) =
776            prepare_batch_open_requests(requests)?;
777        let mut responses =
778            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
779
780        if !topic_to_region_requests.is_empty() {
781            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
782            for (topic, region_requests) in topic_to_region_requests {
783                let semaphore_moved = semaphore.clone();
784                tasks.push(async move {
785                    // Safety: semaphore must exist
786                    let _permit = semaphore_moved.acquire().await.unwrap();
787                    self.open_topic_regions(topic, region_requests).await
788                })
789            }
790            let r = try_join_all(tasks).await?;
791            responses.extend(r.into_iter().flatten());
792        }
793
794        if !remaining_region_requests.is_empty() {
795            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
796            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
797            for (region_id, request) in remaining_region_requests {
798                let semaphore_moved = semaphore.clone();
799                region_ids.push(region_id);
800                tasks.push(async move {
801                    // Safety: semaphore must exist
802                    let _permit = semaphore_moved.acquire().await.unwrap();
803                    let (request, receiver) =
804                        WorkerRequest::new_open_region_request(region_id, request, None);
805
806                    self.workers.submit_to_worker(region_id, request).await?;
807
808                    receiver.await.context(RecvSnafu)?
809                })
810            }
811
812            let results = join_all(tasks).await;
813            responses.extend(region_ids.into_iter().zip(results));
814        }
815
816        Ok(responses)
817    }
818
819    async fn catchup_topic_regions(
820        &self,
821        provider: Provider,
822        region_requests: Vec<(RegionId, RegionCatchupRequest)>,
823    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
824        let now = Instant::now();
825        let region_ids = region_requests
826            .iter()
827            .map(|(region_id, _)| *region_id)
828            .collect::<Vec<_>>();
829        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
830            provider.clone(),
831            self.wal_raw_entry_reader.clone(),
832            &region_ids,
833            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
834        );
835
836        let mut responses = Vec::with_capacity(region_requests.len());
837        for ((region_id, request), entry_receiver) in
838            region_requests.into_iter().zip(entry_receivers)
839        {
840            let (request, receiver) =
841                WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
842            self.workers.submit_to_worker(region_id, request).await?;
843            responses.push(async move { receiver.await.context(RecvSnafu)? });
844        }
845
846        // Wait for entries distribution.
847        let distribution =
848            common_runtime::spawn_global(async move { distributor.distribute().await });
849        // Wait for worker returns.
850        let responses = join_all(responses).await;
851        distribution.await.context(JoinSnafu)??;
852
853        let num_failure = responses.iter().filter(|r| r.is_err()).count();
854        info!(
855            "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
856            region_ids.len() - num_failure,
857            // Safety: provider is kafka provider.
858            provider.as_kafka_provider().unwrap(),
859            num_failure,
860            now.elapsed(),
861        );
862
863        Ok(region_ids.into_iter().zip(responses).collect())
864    }
865
866    async fn handle_batch_catchup_requests(
867        &self,
868        parallelism: usize,
869        requests: Vec<(RegionId, RegionCatchupRequest)>,
870    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
871        let mut responses = Vec::with_capacity(requests.len());
872        let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
873        let mut remaining_region_requests = vec![];
874
875        for (region_id, request) in requests {
876            match self.workers.get_region(region_id) {
877                Some(region) => match region.provider.as_kafka_provider() {
878                    Some(provider) => {
879                        topic_regions
880                            .entry(provider.clone())
881                            .or_default()
882                            .push((region_id, request));
883                    }
884                    None => {
885                        remaining_region_requests.push((region_id, request));
886                    }
887                },
888                None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
889            }
890        }
891
892        let semaphore = Arc::new(Semaphore::new(parallelism));
893
894        if !topic_regions.is_empty() {
895            let mut tasks = Vec::with_capacity(topic_regions.len());
896            for (provider, region_requests) in topic_regions {
897                let semaphore_moved = semaphore.clone();
898                tasks.push(async move {
899                    // Safety: semaphore must exist
900                    let _permit = semaphore_moved.acquire().await.unwrap();
901                    self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
902                        .await
903                })
904            }
905
906            let r = try_join_all(tasks).await?;
907            responses.extend(r.into_iter().flatten());
908        }
909
910        if !remaining_region_requests.is_empty() {
911            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
912            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
913            for (region_id, request) in remaining_region_requests {
914                let semaphore_moved = semaphore.clone();
915                region_ids.push(region_id);
916                tasks.push(async move {
917                    // Safety: semaphore must exist
918                    let _permit = semaphore_moved.acquire().await.unwrap();
919                    let (request, receiver) =
920                        WorkerRequest::new_catchup_region_request(region_id, request, None);
921
922                    self.workers.submit_to_worker(region_id, request).await?;
923
924                    receiver.await.context(RecvSnafu)?
925                })
926            }
927
928            let results = join_all(tasks).await;
929            responses.extend(region_ids.into_iter().zip(results));
930        }
931
932        Ok(responses)
933    }
934
935    /// Handles [RegionRequest] and return its executed result.
936    async fn handle_request(
937        &self,
938        region_id: RegionId,
939        request: RegionRequest,
940    ) -> Result<AffectedRows> {
941        let region_metadata = self.get_metadata(region_id).ok();
942        let (request, receiver) =
943            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
944        self.workers.submit_to_worker(region_id, request).await?;
945
946        receiver.await.context(RecvSnafu)?
947    }
948
949    /// Returns the sequence of latest committed data.
950    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
951        // Reading a region doesn't need to go through the region worker thread.
952        self.find_region(region_id)
953            .map(|r| r.find_committed_sequence())
954    }
955
956    /// Handles the scan `request` and returns a [ScanRegion].
957    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
958        let query_start = Instant::now();
959        // Reading a region doesn't need to go through the region worker thread.
960        let region = self.find_region(region_id)?;
961        let version = region.version();
962        // Get cache.
963        let cache_manager = self.workers.cache_manager();
964
965        let scan_region = ScanRegion::new(
966            version,
967            region.access_layer.clone(),
968            request,
969            CacheStrategy::EnableAll(cache_manager),
970        )
971        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
972        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
973        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
974        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
975        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
976        .with_start_time(query_start);
977
978        #[cfg(feature = "enterprise")]
979        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
980
981        Ok(scan_region)
982    }
983
984    #[cfg(feature = "enterprise")]
985    fn maybe_fill_extension_range_provider(
986        &self,
987        mut scan_region: ScanRegion,
988        region: MitoRegionRef,
989    ) -> ScanRegion {
990        if region.is_follower()
991            && let Some(factory) = self.extension_range_provider_factory.as_ref()
992        {
993            scan_region
994                .set_extension_range_provider(factory.create_extension_range_provider(region));
995        }
996        scan_region
997    }
998
999    /// Converts the [`RegionRole`].
1000    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1001        let region = self.find_region(region_id)?;
1002        region.set_role(role);
1003        Ok(())
1004    }
1005
1006    /// Sets read-only for a region and ensures no more writes in the region after it returns.
1007    async fn set_region_role_state_gracefully(
1008        &self,
1009        region_id: RegionId,
1010        region_role_state: SettableRegionRoleState,
1011    ) -> Result<SetRegionRoleStateResponse> {
1012        // Notes: It acquires the mutable ownership to ensure no other threads,
1013        // Therefore, we submit it to the worker.
1014        let (request, receiver) =
1015            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1016        self.workers.submit_to_worker(region_id, request).await?;
1017
1018        receiver.await.context(RecvSnafu)
1019    }
1020
1021    async fn sync_region(
1022        &self,
1023        region_id: RegionId,
1024        manifest_info: RegionManifestInfo,
1025    ) -> Result<(ManifestVersion, bool)> {
1026        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1027        let manifest_version = manifest_info.data_manifest_version();
1028        let (request, receiver) =
1029            WorkerRequest::new_sync_region_request(region_id, manifest_version);
1030        self.workers.submit_to_worker(region_id, request).await?;
1031
1032        receiver.await.context(RecvSnafu)?
1033    }
1034
1035    async fn remap_manifests(
1036        &self,
1037        request: RemapManifestsRequest,
1038    ) -> Result<RemapManifestsResponse> {
1039        let region_id = request.region_id;
1040        let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1041        self.workers.submit_to_worker(region_id, request).await?;
1042        let manifests = receiver.await.context(RecvSnafu)??;
1043
1044        let new_manifests = manifests
1045            .into_iter()
1046            .map(|(region_id, manifest)| {
1047                Ok((
1048                    region_id,
1049                    serde_json::to_string(&manifest)
1050                        .context(SerializeManifestSnafu { region_id })?,
1051                ))
1052            })
1053            .collect::<Result<HashMap<_, _>>>()?;
1054        Ok(RemapManifestsResponse { new_manifests })
1055    }
1056
1057    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1058        self.workers.get_region(region_id).map(|region| {
1059            if region.is_follower() {
1060                RegionRole::Follower
1061            } else {
1062                RegionRole::Leader
1063            }
1064        })
1065    }
1066}
1067
1068#[async_trait]
1069impl RegionEngine for MitoEngine {
1070    fn name(&self) -> &str {
1071        MITO_ENGINE_NAME
1072    }
1073
1074    #[tracing::instrument(skip_all)]
1075    async fn handle_batch_open_requests(
1076        &self,
1077        parallelism: usize,
1078        requests: Vec<(RegionId, RegionOpenRequest)>,
1079    ) -> Result<BatchResponses, BoxedError> {
1080        // TODO(weny): add metrics.
1081        self.inner
1082            .handle_batch_open_requests(parallelism, requests)
1083            .await
1084            .map(|responses| {
1085                responses
1086                    .into_iter()
1087                    .map(|(region_id, response)| {
1088                        (
1089                            region_id,
1090                            response.map(RegionResponse::new).map_err(BoxedError::new),
1091                        )
1092                    })
1093                    .collect::<Vec<_>>()
1094            })
1095            .map_err(BoxedError::new)
1096    }
1097
1098    #[tracing::instrument(skip_all)]
1099    async fn handle_batch_catchup_requests(
1100        &self,
1101        parallelism: usize,
1102        requests: Vec<(RegionId, RegionCatchupRequest)>,
1103    ) -> Result<BatchResponses, BoxedError> {
1104        self.inner
1105            .handle_batch_catchup_requests(parallelism, requests)
1106            .await
1107            .map(|responses| {
1108                responses
1109                    .into_iter()
1110                    .map(|(region_id, response)| {
1111                        (
1112                            region_id,
1113                            response.map(RegionResponse::new).map_err(BoxedError::new),
1114                        )
1115                    })
1116                    .collect::<Vec<_>>()
1117            })
1118            .map_err(BoxedError::new)
1119    }
1120
1121    #[tracing::instrument(skip_all)]
1122    async fn handle_request(
1123        &self,
1124        region_id: RegionId,
1125        request: RegionRequest,
1126    ) -> Result<RegionResponse, BoxedError> {
1127        let _timer = HANDLE_REQUEST_ELAPSED
1128            .with_label_values(&[request.request_type()])
1129            .start_timer();
1130
1131        let is_alter = matches!(request, RegionRequest::Alter(_));
1132        let is_create = matches!(request, RegionRequest::Create(_));
1133        let mut response = self
1134            .inner
1135            .handle_request(region_id, request)
1136            .await
1137            .map(RegionResponse::new)
1138            .map_err(BoxedError::new)?;
1139
1140        if is_alter {
1141            self.handle_alter_response(region_id, &mut response)
1142                .map_err(BoxedError::new)?;
1143        } else if is_create {
1144            self.handle_create_response(region_id, &mut response)
1145                .map_err(BoxedError::new)?;
1146        }
1147
1148        Ok(response)
1149    }
1150
1151    #[tracing::instrument(skip_all)]
1152    async fn handle_query(
1153        &self,
1154        region_id: RegionId,
1155        request: ScanRequest,
1156    ) -> Result<RegionScannerRef, BoxedError> {
1157        self.scan_region(region_id, request)
1158            .map_err(BoxedError::new)?
1159            .region_scanner()
1160            .await
1161            .map_err(BoxedError::new)
1162    }
1163
1164    fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
1165        Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
1166    }
1167
1168    async fn get_committed_sequence(
1169        &self,
1170        region_id: RegionId,
1171    ) -> Result<SequenceNumber, BoxedError> {
1172        self.inner
1173            .get_committed_sequence(region_id)
1174            .map_err(BoxedError::new)
1175    }
1176
1177    /// Retrieve region's metadata.
1178    async fn get_metadata(
1179        &self,
1180        region_id: RegionId,
1181    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1182        self.inner.get_metadata(region_id).map_err(BoxedError::new)
1183    }
1184
1185    /// Stop the engine.
1186    ///
1187    /// Stopping the engine doesn't stop the underlying log store as other components might
1188    /// still use it. (When no other components are referencing the log store, it will
1189    /// automatically shutdown.)
1190    async fn stop(&self) -> std::result::Result<(), BoxedError> {
1191        self.inner.stop().await.map_err(BoxedError::new)
1192    }
1193
1194    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1195        self.get_region_statistic(region_id)
1196    }
1197
1198    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1199        self.inner
1200            .set_region_role(region_id, role)
1201            .map_err(BoxedError::new)
1202    }
1203
1204    async fn set_region_role_state_gracefully(
1205        &self,
1206        region_id: RegionId,
1207        region_role_state: SettableRegionRoleState,
1208    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1209        let _timer = HANDLE_REQUEST_ELAPSED
1210            .with_label_values(&["set_region_role_state_gracefully"])
1211            .start_timer();
1212
1213        self.inner
1214            .set_region_role_state_gracefully(region_id, region_role_state)
1215            .await
1216            .map_err(BoxedError::new)
1217    }
1218
1219    async fn sync_region(
1220        &self,
1221        region_id: RegionId,
1222        manifest_info: RegionManifestInfo,
1223    ) -> Result<SyncManifestResponse, BoxedError> {
1224        let (_, synced) = self
1225            .inner
1226            .sync_region(region_id, manifest_info)
1227            .await
1228            .map_err(BoxedError::new)?;
1229
1230        Ok(SyncManifestResponse::Mito { synced })
1231    }
1232
1233    async fn remap_manifests(
1234        &self,
1235        request: RemapManifestsRequest,
1236    ) -> Result<RemapManifestsResponse, BoxedError> {
1237        self.inner
1238            .remap_manifests(request)
1239            .await
1240            .map_err(BoxedError::new)
1241    }
1242
1243    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1244        self.inner.role(region_id)
1245    }
1246
1247    fn as_any(&self) -> &dyn Any {
1248        self
1249    }
1250}
1251
1252impl MitoEngine {
1253    fn handle_alter_response(
1254        &self,
1255        region_id: RegionId,
1256        response: &mut RegionResponse,
1257    ) -> Result<()> {
1258        if let Some(statistic) = self.region_statistic(region_id) {
1259            Self::encode_manifest_info_to_extensions(
1260                &region_id,
1261                statistic.manifest,
1262                &mut response.extensions,
1263            )?;
1264        }
1265        let column_metadatas = self
1266            .inner
1267            .find_region(region_id)
1268            .ok()
1269            .map(|r| r.metadata().column_metadatas.clone());
1270        if let Some(column_metadatas) = column_metadatas {
1271            Self::encode_column_metadatas_to_extensions(
1272                &region_id,
1273                column_metadatas,
1274                &mut response.extensions,
1275            )?;
1276        }
1277        Ok(())
1278    }
1279
1280    fn handle_create_response(
1281        &self,
1282        region_id: RegionId,
1283        response: &mut RegionResponse,
1284    ) -> Result<()> {
1285        let column_metadatas = self
1286            .inner
1287            .find_region(region_id)
1288            .ok()
1289            .map(|r| r.metadata().column_metadatas.clone());
1290        if let Some(column_metadatas) = column_metadatas {
1291            Self::encode_column_metadatas_to_extensions(
1292                &region_id,
1293                column_metadatas,
1294                &mut response.extensions,
1295            )?;
1296        }
1297        Ok(())
1298    }
1299}
1300
1301// Tests methods.
1302#[cfg(any(test, feature = "test"))]
1303#[allow(clippy::too_many_arguments)]
1304impl MitoEngine {
1305    /// Returns a new [MitoEngine] for tests.
1306    pub async fn new_for_test<S: LogStore>(
1307        data_home: &str,
1308        mut config: MitoConfig,
1309        log_store: Arc<S>,
1310        object_store_manager: ObjectStoreManagerRef,
1311        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1312        listener: Option<crate::engine::listener::EventListenerRef>,
1313        time_provider: crate::time_provider::TimeProviderRef,
1314        schema_metadata_manager: SchemaMetadataManagerRef,
1315        file_ref_manager: FileReferenceManagerRef,
1316        partition_expr_fetcher: PartitionExprFetcherRef,
1317    ) -> Result<MitoEngine> {
1318        config.sanitize(data_home)?;
1319
1320        let config = Arc::new(config);
1321        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1322        let total_memory = get_total_memory_bytes().max(0) as u64;
1323        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1324        let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
1325            .with_on_update(|usage| {
1326                SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1327            })
1328            .with_on_reject(|| {
1329                SCAN_REQUESTS_REJECTED_TOTAL.inc();
1330            });
1331        Ok(MitoEngine {
1332            inner: Arc::new(EngineInner {
1333                workers: WorkerGroup::start_for_test(
1334                    config.clone(),
1335                    log_store,
1336                    object_store_manager,
1337                    write_buffer_manager,
1338                    listener,
1339                    schema_metadata_manager,
1340                    file_ref_manager,
1341                    time_provider,
1342                    partition_expr_fetcher,
1343                )
1344                .await?,
1345                config,
1346                wal_raw_entry_reader,
1347                scan_memory_tracker,
1348                #[cfg(feature = "enterprise")]
1349                extension_range_provider_factory: None,
1350            }),
1351        })
1352    }
1353
1354    /// Returns the purge scheduler.
1355    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1356        self.inner.workers.purge_scheduler()
1357    }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362    use std::time::Duration;
1363
1364    use super::*;
1365    use crate::sst::file::FileMeta;
1366
1367    #[test]
1368    fn test_is_valid_region_edit() {
1369        // Valid: has only "files_to_add"
1370        let edit = RegionEdit {
1371            files_to_add: vec![FileMeta::default()],
1372            files_to_remove: vec![],
1373            timestamp_ms: None,
1374            compaction_time_window: None,
1375            flushed_entry_id: None,
1376            flushed_sequence: None,
1377            committed_sequence: None,
1378        };
1379        assert!(is_valid_region_edit(&edit));
1380
1381        // Invalid: "files_to_add" is empty
1382        let edit = RegionEdit {
1383            files_to_add: vec![],
1384            files_to_remove: vec![],
1385            timestamp_ms: None,
1386            compaction_time_window: None,
1387            flushed_entry_id: None,
1388            flushed_sequence: None,
1389            committed_sequence: None,
1390        };
1391        assert!(!is_valid_region_edit(&edit));
1392
1393        // Invalid: "files_to_remove" is not empty
1394        let edit = RegionEdit {
1395            files_to_add: vec![FileMeta::default()],
1396            files_to_remove: vec![FileMeta::default()],
1397            timestamp_ms: None,
1398            compaction_time_window: None,
1399            flushed_entry_id: None,
1400            flushed_sequence: None,
1401            committed_sequence: None,
1402        };
1403        assert!(!is_valid_region_edit(&edit));
1404
1405        // Invalid: other fields are not all "None"s
1406        let edit = RegionEdit {
1407            files_to_add: vec![FileMeta::default()],
1408            files_to_remove: vec![],
1409            timestamp_ms: None,
1410            compaction_time_window: Some(Duration::from_secs(1)),
1411            flushed_entry_id: None,
1412            flushed_sequence: None,
1413            committed_sequence: None,
1414        };
1415        assert!(!is_valid_region_edit(&edit));
1416        let edit = RegionEdit {
1417            files_to_add: vec![FileMeta::default()],
1418            files_to_remove: vec![],
1419            timestamp_ms: None,
1420            compaction_time_window: None,
1421            flushed_entry_id: Some(1),
1422            flushed_sequence: None,
1423            committed_sequence: None,
1424        };
1425        assert!(!is_valid_region_edit(&edit));
1426        let edit = RegionEdit {
1427            files_to_add: vec![FileMeta::default()],
1428            files_to_remove: vec![],
1429            timestamp_ms: None,
1430            compaction_time_window: None,
1431            flushed_entry_id: None,
1432            flushed_sequence: Some(1),
1433            committed_sequence: None,
1434        };
1435        assert!(!is_valid_region_edit(&edit));
1436    }
1437}