mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod staging_test;
69#[cfg(test)]
70mod sync_test;
71#[cfg(test)]
72mod truncate_test;
73
74mod puffin_index;
75
76use std::any::Any;
77use std::collections::HashMap;
78use std::sync::Arc;
79use std::time::Instant;
80
81use api::region::RegionResponse;
82use async_trait::async_trait;
83use common_base::Plugins;
84use common_error::ext::BoxedError;
85use common_meta::key::SchemaMetadataManagerRef;
86use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
87use common_stat::get_total_memory_bytes;
88use common_telemetry::{info, tracing, warn};
89use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
90use futures::future::{join_all, try_join_all};
91use futures::stream::{self, Stream, StreamExt};
92use object_store::manager::ObjectStoreManagerRef;
93use snafu::{OptionExt, ResultExt, ensure};
94use store_api::ManifestVersion;
95use store_api::codec::PrimaryKeyEncoding;
96use store_api::logstore::LogStore;
97use store_api::logstore::provider::{KafkaProvider, Provider};
98use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
99use store_api::metric_engine_consts::{
100    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
101};
102use store_api::region_engine::{
103    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
104    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
105};
106use store_api::region_request::{
107    AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
108};
109use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
110use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
111use tokio::sync::{Semaphore, oneshot};
112
113use crate::access_layer::RegionFilePathFactory;
114use crate::cache::{CacheManagerRef, CacheStrategy};
115use crate::config::MitoConfig;
116use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
117use crate::error::{
118    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
119    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
120};
121#[cfg(feature = "enterprise")]
122use crate::extension::BoxedExtensionRangeProviderFactory;
123use crate::gc::GcLimiterRef;
124use crate::manifest::action::RegionEdit;
125use crate::memtable::MemtableStats;
126use crate::metrics::{
127    HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
128};
129use crate::read::scan_region::{ScanRegion, Scanner};
130use crate::read::stream::ScanBatchStream;
131use crate::region::MitoRegionRef;
132use crate::region::opener::PartitionExprFetcherRef;
133use crate::request::{RegionEditRequest, WorkerRequest};
134use crate::sst::file::{FileMeta, RegionFileId};
135use crate::sst::file_ref::FileReferenceManagerRef;
136use crate::wal::entry_distributor::{
137    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
138};
139use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
140use crate::worker::WorkerGroup;
141
142pub const MITO_ENGINE_NAME: &str = "mito";
143
144pub struct MitoEngineBuilder<'a, S: LogStore> {
145    data_home: &'a str,
146    config: MitoConfig,
147    log_store: Arc<S>,
148    object_store_manager: ObjectStoreManagerRef,
149    schema_metadata_manager: SchemaMetadataManagerRef,
150    file_ref_manager: FileReferenceManagerRef,
151    partition_expr_fetcher: PartitionExprFetcherRef,
152    plugins: Plugins,
153    max_concurrent_queries: usize,
154    #[cfg(feature = "enterprise")]
155    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
156}
157
158impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
159    #[allow(clippy::too_many_arguments)]
160    pub fn new(
161        data_home: &'a str,
162        config: MitoConfig,
163        log_store: Arc<S>,
164        object_store_manager: ObjectStoreManagerRef,
165        schema_metadata_manager: SchemaMetadataManagerRef,
166        file_ref_manager: FileReferenceManagerRef,
167        partition_expr_fetcher: PartitionExprFetcherRef,
168        plugins: Plugins,
169        max_concurrent_queries: usize,
170    ) -> Self {
171        Self {
172            data_home,
173            config,
174            log_store,
175            object_store_manager,
176            schema_metadata_manager,
177            file_ref_manager,
178            plugins,
179            partition_expr_fetcher,
180            max_concurrent_queries,
181            #[cfg(feature = "enterprise")]
182            extension_range_provider_factory: None,
183        }
184    }
185
186    #[cfg(feature = "enterprise")]
187    #[must_use]
188    pub fn with_extension_range_provider_factory(
189        self,
190        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
191    ) -> Self {
192        Self {
193            extension_range_provider_factory,
194            ..self
195        }
196    }
197
198    pub async fn try_build(mut self) -> Result<MitoEngine> {
199        self.config.sanitize(self.data_home)?;
200
201        let config = Arc::new(self.config);
202        let workers = WorkerGroup::start(
203            config.clone(),
204            self.log_store.clone(),
205            self.object_store_manager,
206            self.schema_metadata_manager,
207            self.file_ref_manager,
208            self.partition_expr_fetcher.clone(),
209            self.plugins,
210        )
211        .await?;
212        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
213        let total_memory = get_total_memory_bytes().max(0) as u64;
214        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
215        let scan_memory_tracker =
216            QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
217                .with_on_update(|usage| {
218                    SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
219                })
220                .with_on_reject(|| {
221                    SCAN_REQUESTS_REJECTED_TOTAL.inc();
222                });
223
224        let inner = EngineInner {
225            workers,
226            config,
227            wal_raw_entry_reader,
228            scan_memory_tracker,
229            #[cfg(feature = "enterprise")]
230            extension_range_provider_factory: None,
231        };
232
233        #[cfg(feature = "enterprise")]
234        let inner =
235            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
236
237        Ok(MitoEngine {
238            inner: Arc::new(inner),
239        })
240    }
241}
242
243/// Region engine implementation for timeseries data.
244#[derive(Clone)]
245pub struct MitoEngine {
246    inner: Arc<EngineInner>,
247}
248
249impl MitoEngine {
250    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
251    #[allow(clippy::too_many_arguments)]
252    pub async fn new<S: LogStore>(
253        data_home: &str,
254        config: MitoConfig,
255        log_store: Arc<S>,
256        object_store_manager: ObjectStoreManagerRef,
257        schema_metadata_manager: SchemaMetadataManagerRef,
258        file_ref_manager: FileReferenceManagerRef,
259        partition_expr_fetcher: PartitionExprFetcherRef,
260        plugins: Plugins,
261    ) -> Result<MitoEngine> {
262        let builder = MitoEngineBuilder::new(
263            data_home,
264            config,
265            log_store,
266            object_store_manager,
267            schema_metadata_manager,
268            file_ref_manager,
269            partition_expr_fetcher,
270            plugins,
271            0, // Default: no limit on concurrent queries
272        );
273        builder.try_build().await
274    }
275
276    pub fn mito_config(&self) -> &MitoConfig {
277        &self.inner.config
278    }
279
280    pub fn cache_manager(&self) -> CacheManagerRef {
281        self.inner.workers.cache_manager()
282    }
283
284    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
285        self.inner.workers.file_ref_manager()
286    }
287
288    pub fn gc_limiter(&self) -> GcLimiterRef {
289        self.inner.workers.gc_limiter()
290    }
291
292    /// Get all tmp ref files for given region ids, excluding files that's already in manifest.
293    pub async fn get_snapshot_of_file_refs(
294        &self,
295        file_handle_regions: impl IntoIterator<Item = RegionId>,
296        manifest_regions: HashMap<RegionId, Vec<RegionId>>,
297    ) -> Result<FileRefsManifest> {
298        let file_ref_mgr = self.file_ref_manager();
299
300        let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
301        // Convert region IDs to MitoRegionRef objects, ignore regions that do not exist on current datanode
302        // as regions on other datanodes are not managed by this engine.
303        let query_regions: Vec<MitoRegionRef> = file_handle_regions
304            .into_iter()
305            .filter_map(|region_id| self.find_region(region_id))
306            .collect();
307
308        let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
309            .into_iter()
310            .filter_map(|(related_region, queries)| {
311                self.find_region(related_region).map(|r| (r, queries))
312            })
313            .collect();
314
315        file_ref_mgr
316            .get_snapshot_of_file_refs(query_regions, related_regions)
317            .await
318    }
319
320    /// Returns true if the specific region exists.
321    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
322        self.inner.workers.is_region_exists(region_id)
323    }
324
325    /// Returns true if the specific region exists.
326    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
327        self.inner.workers.is_region_opening(region_id)
328    }
329
330    /// Returns true if the specific region is catching up.
331    pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
332        self.inner.workers.is_region_catching_up(region_id)
333    }
334
335    /// Returns the region disk/memory statistic.
336    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
337        self.find_region(region_id)
338            .map(|region| region.region_statistic())
339    }
340
341    /// Returns primary key encoding of the region.
342    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
343        self.find_region(region_id)
344            .map(|r| r.primary_key_encoding())
345    }
346
347    /// Handle substrait query and return a stream of record batches
348    ///
349    /// Notice that the output stream's ordering is not guranateed. If order
350    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
351    #[tracing::instrument(skip_all)]
352    pub async fn scan_to_stream(
353        &self,
354        region_id: RegionId,
355        request: ScanRequest,
356    ) -> Result<SendableRecordBatchStream, BoxedError> {
357        self.scanner(region_id, request)
358            .await
359            .map_err(BoxedError::new)?
360            .scan()
361            .await
362    }
363
364    /// Scan [`Batch`]es by [`ScanRequest`].
365    pub async fn scan_batch(
366        &self,
367        region_id: RegionId,
368        request: ScanRequest,
369        filter_deleted: bool,
370    ) -> Result<ScanBatchStream> {
371        let mut scan_region = self.scan_region(region_id, request)?;
372        scan_region.set_filter_deleted(filter_deleted);
373        scan_region.scanner().await?.scan_batch()
374    }
375
376    /// Returns a scanner to scan for `request`.
377    pub(crate) async fn scanner(
378        &self,
379        region_id: RegionId,
380        request: ScanRequest,
381    ) -> Result<Scanner> {
382        self.scan_region(region_id, request)?.scanner().await
383    }
384
385    /// Scans a region.
386    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
387        self.inner.scan_region(region_id, request)
388    }
389
390    /// Edit region's metadata by [RegionEdit] directly. Use with care.
391    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
392    /// Other region editing intention will result in an "invalid request" error.
393    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
394    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
395        let _timer = HANDLE_REQUEST_ELAPSED
396            .with_label_values(&["edit_region"])
397            .start_timer();
398
399        ensure!(
400            is_valid_region_edit(&edit),
401            InvalidRequestSnafu {
402                region_id,
403                reason: "invalid region edit"
404            }
405        );
406
407        let (tx, rx) = oneshot::channel();
408        let request = WorkerRequest::EditRegion(RegionEditRequest {
409            region_id,
410            edit,
411            tx,
412        });
413        self.inner
414            .workers
415            .submit_to_worker(region_id, request)
416            .await?;
417        rx.await.context(RecvSnafu)?
418    }
419
420    #[cfg(test)]
421    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
422        self.find_region(id)
423    }
424
425    pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
426        self.inner.workers.get_region(region_id)
427    }
428
429    fn encode_manifest_info_to_extensions(
430        region_id: &RegionId,
431        manifest_info: RegionManifestInfo,
432        extensions: &mut HashMap<String, Vec<u8>>,
433    ) -> Result<()> {
434        let region_manifest_info = vec![(*region_id, manifest_info)];
435
436        extensions.insert(
437            MANIFEST_INFO_EXTENSION_KEY.to_string(),
438            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
439        );
440        info!(
441            "Added manifest info: {:?} to extensions, region_id: {:?}",
442            region_manifest_info, region_id
443        );
444        Ok(())
445    }
446
447    fn encode_column_metadatas_to_extensions(
448        region_id: &RegionId,
449        column_metadatas: Vec<ColumnMetadata>,
450        extensions: &mut HashMap<String, Vec<u8>>,
451    ) -> Result<()> {
452        extensions.insert(
453            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
454            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
455        );
456        info!(
457            "Added column metadatas: {:?} to extensions, region_id: {:?}",
458            column_metadatas, region_id
459        );
460        Ok(())
461    }
462
463    /// Find the current version's memtables and SSTs stats by region_id.
464    /// The stats must be collected in one place one time to ensure data consistency.
465    pub fn find_memtable_and_sst_stats(
466        &self,
467        region_id: RegionId,
468    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
469        let region = self
470            .find_region(region_id)
471            .context(RegionNotFoundSnafu { region_id })?;
472
473        let version = region.version();
474        let memtable_stats = version
475            .memtables
476            .list_memtables()
477            .iter()
478            .map(|x| x.stats())
479            .collect::<Vec<_>>();
480
481        let sst_stats = version
482            .ssts
483            .levels()
484            .iter()
485            .flat_map(|level| level.files().map(|x| x.meta_ref()))
486            .cloned()
487            .collect::<Vec<_>>();
488        Ok((memtable_stats, sst_stats))
489    }
490
491    /// Lists all SSTs from the manifest of all regions in the engine.
492    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
493        let node_id = self.inner.workers.file_ref_manager().node_id();
494        let regions = self.inner.workers.all_regions();
495
496        let mut results = Vec::new();
497        for region in regions {
498            let mut entries = region.manifest_sst_entries().await;
499            for e in &mut entries {
500                e.node_id = node_id;
501            }
502            results.extend(entries);
503        }
504
505        results
506    }
507
508    /// Lists metadata about all puffin index targets stored in the engine.
509    pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
510        let node_id = self.inner.workers.file_ref_manager().node_id();
511        let cache_manager = self.inner.workers.cache_manager();
512        let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
513        let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
514        let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
515
516        let mut results = Vec::new();
517
518        for region in self.inner.workers.all_regions() {
519            let manifest_entries = region.manifest_sst_entries().await;
520            let access_layer = region.access_layer.clone();
521            let table_dir = access_layer.table_dir().to_string();
522            let path_type = access_layer.path_type();
523            let object_store = access_layer.object_store().clone();
524            let puffin_factory = access_layer.puffin_manager_factory().clone();
525            let path_factory = RegionFilePathFactory::new(table_dir, path_type);
526
527            let entry_futures = manifest_entries.into_iter().map(|entry| {
528                let object_store = object_store.clone();
529                let path_factory = path_factory.clone();
530                let puffin_factory = puffin_factory.clone();
531                let puffin_metadata_cache = puffin_metadata_cache.clone();
532                let bloom_filter_cache = bloom_filter_cache.clone();
533                let inverted_index_cache = inverted_index_cache.clone();
534
535                async move {
536                    let Some(index_file_path) = entry.index_file_path.as_ref() else {
537                        return Vec::new();
538                    };
539
540                    let Some(index_file_id) = entry.index_file_id.as_ref() else {
541                        return Vec::new();
542                    };
543                    let file_id = match FileId::parse_str(index_file_id) {
544                        Ok(file_id) => file_id,
545                        Err(err) => {
546                            warn!(
547                                err;
548                                "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
549                                entry.table_dir,
550                                index_file_id
551                            );
552                            return Vec::new();
553                        }
554                    };
555                    let region_file_id = RegionFileId::new(entry.region_id, file_id);
556                    let context = IndexEntryContext {
557                        table_dir: &entry.table_dir,
558                        index_file_path: index_file_path.as_str(),
559                        region_id: entry.region_id,
560                        table_id: entry.table_id,
561                        region_number: entry.region_number,
562                        region_group: entry.region_group,
563                        region_sequence: entry.region_sequence,
564                        file_id: index_file_id,
565                        index_file_size: entry.index_file_size,
566                        node_id,
567                    };
568
569                    let manager = puffin_factory
570                        .build(object_store, path_factory)
571                        .with_puffin_metadata_cache(puffin_metadata_cache);
572
573                    collect_index_entries_from_puffin(
574                        manager,
575                        region_file_id,
576                        context,
577                        bloom_filter_cache,
578                        inverted_index_cache,
579                    )
580                    .await
581                }
582            });
583
584            let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
585            while let Some(mut metas) = meta_stream.next().await {
586                results.append(&mut metas);
587            }
588        }
589
590        results
591    }
592
593    /// Lists all SSTs from the storage layer of all regions in the engine.
594    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
595        let node_id = self.inner.workers.file_ref_manager().node_id();
596        let regions = self.inner.workers.all_regions();
597
598        let mut layers_distinct_table_dirs = HashMap::new();
599        for region in regions {
600            let table_dir = region.access_layer.table_dir();
601            if !layers_distinct_table_dirs.contains_key(table_dir) {
602                layers_distinct_table_dirs
603                    .insert(table_dir.to_string(), region.access_layer.clone());
604            }
605        }
606
607        stream::iter(layers_distinct_table_dirs)
608            .map(|(_, access_layer)| access_layer.storage_sst_entries())
609            .flatten()
610            .map(move |entry| {
611                entry.map(move |mut entry| {
612                    entry.node_id = node_id;
613                    entry
614                })
615            })
616    }
617}
618
619/// Check whether the region edit is valid. Only adding files to region is considered valid now.
620fn is_valid_region_edit(edit: &RegionEdit) -> bool {
621    !edit.files_to_add.is_empty()
622        && edit.files_to_remove.is_empty()
623        && matches!(
624            edit,
625            RegionEdit {
626                files_to_add: _,
627                files_to_remove: _,
628                timestamp_ms: _,
629                compaction_time_window: None,
630                flushed_entry_id: None,
631                flushed_sequence: None,
632                ..
633            }
634        )
635}
636
637/// Inner struct of [MitoEngine].
638struct EngineInner {
639    /// Region workers group.
640    workers: WorkerGroup,
641    /// Config of the engine.
642    config: Arc<MitoConfig>,
643    /// The Wal raw entry reader.
644    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
645    /// Memory tracker for table scans.
646    scan_memory_tracker: QueryMemoryTracker,
647    #[cfg(feature = "enterprise")]
648    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
649}
650
651type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
652
653/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
654fn prepare_batch_open_requests(
655    requests: Vec<(RegionId, RegionOpenRequest)>,
656) -> Result<(
657    TopicGroupedRegionOpenRequests,
658    Vec<(RegionId, RegionOpenRequest)>,
659)> {
660    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
661    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
662    for (region_id, request) in requests {
663        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
664            serde_json::from_str(options).context(SerdeJsonSnafu)?
665        } else {
666            WalOptions::RaftEngine
667        };
668        match options {
669            WalOptions::Kafka(options) => {
670                topic_to_regions
671                    .entry(options.topic)
672                    .or_default()
673                    .push((region_id, request));
674            }
675            WalOptions::RaftEngine | WalOptions::Noop => {
676                remaining_regions.push((region_id, request));
677            }
678        }
679    }
680
681    Ok((topic_to_regions, remaining_regions))
682}
683
684impl EngineInner {
685    #[cfg(feature = "enterprise")]
686    #[must_use]
687    fn with_extension_range_provider_factory(
688        self,
689        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
690    ) -> Self {
691        Self {
692            extension_range_provider_factory,
693            ..self
694        }
695    }
696
697    /// Stop the inner engine.
698    async fn stop(&self) -> Result<()> {
699        self.workers.stop().await
700    }
701
702    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
703        self.workers
704            .get_region(region_id)
705            .context(RegionNotFoundSnafu { region_id })
706    }
707
708    /// Get metadata of a region.
709    ///
710    /// Returns error if the region doesn't exist.
711    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
712        // Reading a region doesn't need to go through the region worker thread.
713        let region = self.find_region(region_id)?;
714        Ok(region.metadata())
715    }
716
717    async fn open_topic_regions(
718        &self,
719        topic: String,
720        region_requests: Vec<(RegionId, RegionOpenRequest)>,
721    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
722        let now = Instant::now();
723        let region_ids = region_requests
724            .iter()
725            .map(|(region_id, _)| *region_id)
726            .collect::<Vec<_>>();
727        let provider = Provider::kafka_provider(topic);
728        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
729            provider.clone(),
730            self.wal_raw_entry_reader.clone(),
731            &region_ids,
732            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
733        );
734
735        let mut responses = Vec::with_capacity(region_requests.len());
736        for ((region_id, request), entry_receiver) in
737            region_requests.into_iter().zip(entry_receivers)
738        {
739            let (request, receiver) =
740                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
741            self.workers.submit_to_worker(region_id, request).await?;
742            responses.push(async move { receiver.await.context(RecvSnafu)? });
743        }
744
745        // Waits for entries distribution.
746        let distribution =
747            common_runtime::spawn_global(async move { distributor.distribute().await });
748        // Waits for worker returns.
749        let responses = join_all(responses).await;
750        distribution.await.context(JoinSnafu)??;
751
752        let num_failure = responses.iter().filter(|r| r.is_err()).count();
753        info!(
754            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
755            region_ids.len() - num_failure,
756            // Safety: provider is kafka provider.
757            provider.as_kafka_provider().unwrap(),
758            num_failure,
759            now.elapsed(),
760        );
761        Ok(region_ids.into_iter().zip(responses).collect())
762    }
763
764    async fn handle_batch_open_requests(
765        &self,
766        parallelism: usize,
767        requests: Vec<(RegionId, RegionOpenRequest)>,
768    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
769        let semaphore = Arc::new(Semaphore::new(parallelism));
770        let (topic_to_region_requests, remaining_region_requests) =
771            prepare_batch_open_requests(requests)?;
772        let mut responses =
773            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
774
775        if !topic_to_region_requests.is_empty() {
776            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
777            for (topic, region_requests) in topic_to_region_requests {
778                let semaphore_moved = semaphore.clone();
779                tasks.push(async move {
780                    // Safety: semaphore must exist
781                    let _permit = semaphore_moved.acquire().await.unwrap();
782                    self.open_topic_regions(topic, region_requests).await
783                })
784            }
785            let r = try_join_all(tasks).await?;
786            responses.extend(r.into_iter().flatten());
787        }
788
789        if !remaining_region_requests.is_empty() {
790            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
791            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
792            for (region_id, request) in remaining_region_requests {
793                let semaphore_moved = semaphore.clone();
794                region_ids.push(region_id);
795                tasks.push(async move {
796                    // Safety: semaphore must exist
797                    let _permit = semaphore_moved.acquire().await.unwrap();
798                    let (request, receiver) =
799                        WorkerRequest::new_open_region_request(region_id, request, None);
800
801                    self.workers.submit_to_worker(region_id, request).await?;
802
803                    receiver.await.context(RecvSnafu)?
804                })
805            }
806
807            let results = join_all(tasks).await;
808            responses.extend(region_ids.into_iter().zip(results));
809        }
810
811        Ok(responses)
812    }
813
814    async fn catchup_topic_regions(
815        &self,
816        provider: Provider,
817        region_requests: Vec<(RegionId, RegionCatchupRequest)>,
818    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
819        let now = Instant::now();
820        let region_ids = region_requests
821            .iter()
822            .map(|(region_id, _)| *region_id)
823            .collect::<Vec<_>>();
824        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
825            provider.clone(),
826            self.wal_raw_entry_reader.clone(),
827            &region_ids,
828            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
829        );
830
831        let mut responses = Vec::with_capacity(region_requests.len());
832        for ((region_id, request), entry_receiver) in
833            region_requests.into_iter().zip(entry_receivers)
834        {
835            let (request, receiver) =
836                WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
837            self.workers.submit_to_worker(region_id, request).await?;
838            responses.push(async move { receiver.await.context(RecvSnafu)? });
839        }
840
841        // Wait for entries distribution.
842        let distribution =
843            common_runtime::spawn_global(async move { distributor.distribute().await });
844        // Wait for worker returns.
845        let responses = join_all(responses).await;
846        distribution.await.context(JoinSnafu)??;
847
848        let num_failure = responses.iter().filter(|r| r.is_err()).count();
849        info!(
850            "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
851            region_ids.len() - num_failure,
852            // Safety: provider is kafka provider.
853            provider.as_kafka_provider().unwrap(),
854            num_failure,
855            now.elapsed(),
856        );
857
858        Ok(region_ids.into_iter().zip(responses).collect())
859    }
860
861    async fn handle_batch_catchup_requests(
862        &self,
863        parallelism: usize,
864        requests: Vec<(RegionId, RegionCatchupRequest)>,
865    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
866        let mut responses = Vec::with_capacity(requests.len());
867        let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
868        let mut remaining_region_requests = vec![];
869
870        for (region_id, request) in requests {
871            match self.workers.get_region(region_id) {
872                Some(region) => match region.provider.as_kafka_provider() {
873                    Some(provider) => {
874                        topic_regions
875                            .entry(provider.clone())
876                            .or_default()
877                            .push((region_id, request));
878                    }
879                    None => {
880                        remaining_region_requests.push((region_id, request));
881                    }
882                },
883                None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
884            }
885        }
886
887        let semaphore = Arc::new(Semaphore::new(parallelism));
888
889        if !topic_regions.is_empty() {
890            let mut tasks = Vec::with_capacity(topic_regions.len());
891            for (provider, region_requests) in topic_regions {
892                let semaphore_moved = semaphore.clone();
893                tasks.push(async move {
894                    // Safety: semaphore must exist
895                    let _permit = semaphore_moved.acquire().await.unwrap();
896                    self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
897                        .await
898                })
899            }
900
901            let r = try_join_all(tasks).await?;
902            responses.extend(r.into_iter().flatten());
903        }
904
905        if !remaining_region_requests.is_empty() {
906            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
907            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
908            for (region_id, request) in remaining_region_requests {
909                let semaphore_moved = semaphore.clone();
910                region_ids.push(region_id);
911                tasks.push(async move {
912                    // Safety: semaphore must exist
913                    let _permit = semaphore_moved.acquire().await.unwrap();
914                    let (request, receiver) =
915                        WorkerRequest::new_catchup_region_request(region_id, request, None);
916
917                    self.workers.submit_to_worker(region_id, request).await?;
918
919                    receiver.await.context(RecvSnafu)?
920                })
921            }
922
923            let results = join_all(tasks).await;
924            responses.extend(region_ids.into_iter().zip(results));
925        }
926
927        Ok(responses)
928    }
929
930    /// Handles [RegionRequest] and return its executed result.
931    async fn handle_request(
932        &self,
933        region_id: RegionId,
934        request: RegionRequest,
935    ) -> Result<AffectedRows> {
936        let region_metadata = self.get_metadata(region_id).ok();
937        let (request, receiver) =
938            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
939        self.workers.submit_to_worker(region_id, request).await?;
940
941        receiver.await.context(RecvSnafu)?
942    }
943
944    /// Returns the sequence of latest committed data.
945    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
946        // Reading a region doesn't need to go through the region worker thread.
947        self.find_region(region_id)
948            .map(|r| r.find_committed_sequence())
949    }
950
951    /// Handles the scan `request` and returns a [ScanRegion].
952    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
953        let query_start = Instant::now();
954        // Reading a region doesn't need to go through the region worker thread.
955        let region = self.find_region(region_id)?;
956        let version = region.version();
957        // Get cache.
958        let cache_manager = self.workers.cache_manager();
959
960        let scan_region = ScanRegion::new(
961            version,
962            region.access_layer.clone(),
963            request,
964            CacheStrategy::EnableAll(cache_manager),
965        )
966        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
967        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
968        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
969        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
970        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
971        .with_start_time(query_start);
972
973        #[cfg(feature = "enterprise")]
974        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
975
976        Ok(scan_region)
977    }
978
979    #[cfg(feature = "enterprise")]
980    fn maybe_fill_extension_range_provider(
981        &self,
982        mut scan_region: ScanRegion,
983        region: MitoRegionRef,
984    ) -> ScanRegion {
985        if region.is_follower()
986            && let Some(factory) = self.extension_range_provider_factory.as_ref()
987        {
988            scan_region
989                .set_extension_range_provider(factory.create_extension_range_provider(region));
990        }
991        scan_region
992    }
993
994    /// Converts the [`RegionRole`].
995    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
996        let region = self.find_region(region_id)?;
997        region.set_role(role);
998        Ok(())
999    }
1000
1001    /// Sets read-only for a region and ensures no more writes in the region after it returns.
1002    async fn set_region_role_state_gracefully(
1003        &self,
1004        region_id: RegionId,
1005        region_role_state: SettableRegionRoleState,
1006    ) -> Result<SetRegionRoleStateResponse> {
1007        // Notes: It acquires the mutable ownership to ensure no other threads,
1008        // Therefore, we submit it to the worker.
1009        let (request, receiver) =
1010            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1011        self.workers.submit_to_worker(region_id, request).await?;
1012
1013        receiver.await.context(RecvSnafu)
1014    }
1015
1016    async fn sync_region(
1017        &self,
1018        region_id: RegionId,
1019        manifest_info: RegionManifestInfo,
1020    ) -> Result<(ManifestVersion, bool)> {
1021        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1022        let manifest_version = manifest_info.data_manifest_version();
1023        let (request, receiver) =
1024            WorkerRequest::new_sync_region_request(region_id, manifest_version);
1025        self.workers.submit_to_worker(region_id, request).await?;
1026
1027        receiver.await.context(RecvSnafu)?
1028    }
1029
1030    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1031        self.workers.get_region(region_id).map(|region| {
1032            if region.is_follower() {
1033                RegionRole::Follower
1034            } else {
1035                RegionRole::Leader
1036            }
1037        })
1038    }
1039}
1040
1041#[async_trait]
1042impl RegionEngine for MitoEngine {
1043    fn name(&self) -> &str {
1044        MITO_ENGINE_NAME
1045    }
1046
1047    #[tracing::instrument(skip_all)]
1048    async fn handle_batch_open_requests(
1049        &self,
1050        parallelism: usize,
1051        requests: Vec<(RegionId, RegionOpenRequest)>,
1052    ) -> Result<BatchResponses, BoxedError> {
1053        // TODO(weny): add metrics.
1054        self.inner
1055            .handle_batch_open_requests(parallelism, requests)
1056            .await
1057            .map(|responses| {
1058                responses
1059                    .into_iter()
1060                    .map(|(region_id, response)| {
1061                        (
1062                            region_id,
1063                            response.map(RegionResponse::new).map_err(BoxedError::new),
1064                        )
1065                    })
1066                    .collect::<Vec<_>>()
1067            })
1068            .map_err(BoxedError::new)
1069    }
1070
1071    #[tracing::instrument(skip_all)]
1072    async fn handle_batch_catchup_requests(
1073        &self,
1074        parallelism: usize,
1075        requests: Vec<(RegionId, RegionCatchupRequest)>,
1076    ) -> Result<BatchResponses, BoxedError> {
1077        self.inner
1078            .handle_batch_catchup_requests(parallelism, requests)
1079            .await
1080            .map(|responses| {
1081                responses
1082                    .into_iter()
1083                    .map(|(region_id, response)| {
1084                        (
1085                            region_id,
1086                            response.map(RegionResponse::new).map_err(BoxedError::new),
1087                        )
1088                    })
1089                    .collect::<Vec<_>>()
1090            })
1091            .map_err(BoxedError::new)
1092    }
1093
1094    #[tracing::instrument(skip_all)]
1095    async fn handle_request(
1096        &self,
1097        region_id: RegionId,
1098        request: RegionRequest,
1099    ) -> Result<RegionResponse, BoxedError> {
1100        let _timer = HANDLE_REQUEST_ELAPSED
1101            .with_label_values(&[request.request_type()])
1102            .start_timer();
1103
1104        let is_alter = matches!(request, RegionRequest::Alter(_));
1105        let is_create = matches!(request, RegionRequest::Create(_));
1106        let mut response = self
1107            .inner
1108            .handle_request(region_id, request)
1109            .await
1110            .map(RegionResponse::new)
1111            .map_err(BoxedError::new)?;
1112
1113        if is_alter {
1114            self.handle_alter_response(region_id, &mut response)
1115                .map_err(BoxedError::new)?;
1116        } else if is_create {
1117            self.handle_create_response(region_id, &mut response)
1118                .map_err(BoxedError::new)?;
1119        }
1120
1121        Ok(response)
1122    }
1123
1124    #[tracing::instrument(skip_all)]
1125    async fn handle_query(
1126        &self,
1127        region_id: RegionId,
1128        request: ScanRequest,
1129    ) -> Result<RegionScannerRef, BoxedError> {
1130        self.scan_region(region_id, request)
1131            .map_err(BoxedError::new)?
1132            .region_scanner()
1133            .await
1134            .map_err(BoxedError::new)
1135    }
1136
1137    fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
1138        Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
1139    }
1140
1141    async fn get_committed_sequence(
1142        &self,
1143        region_id: RegionId,
1144    ) -> Result<SequenceNumber, BoxedError> {
1145        self.inner
1146            .get_committed_sequence(region_id)
1147            .map_err(BoxedError::new)
1148    }
1149
1150    /// Retrieve region's metadata.
1151    async fn get_metadata(
1152        &self,
1153        region_id: RegionId,
1154    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1155        self.inner.get_metadata(region_id).map_err(BoxedError::new)
1156    }
1157
1158    /// Stop the engine.
1159    ///
1160    /// Stopping the engine doesn't stop the underlying log store as other components might
1161    /// still use it. (When no other components are referencing the log store, it will
1162    /// automatically shutdown.)
1163    async fn stop(&self) -> std::result::Result<(), BoxedError> {
1164        self.inner.stop().await.map_err(BoxedError::new)
1165    }
1166
1167    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1168        self.get_region_statistic(region_id)
1169    }
1170
1171    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1172        self.inner
1173            .set_region_role(region_id, role)
1174            .map_err(BoxedError::new)
1175    }
1176
1177    async fn set_region_role_state_gracefully(
1178        &self,
1179        region_id: RegionId,
1180        region_role_state: SettableRegionRoleState,
1181    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1182        let _timer = HANDLE_REQUEST_ELAPSED
1183            .with_label_values(&["set_region_role_state_gracefully"])
1184            .start_timer();
1185
1186        self.inner
1187            .set_region_role_state_gracefully(region_id, region_role_state)
1188            .await
1189            .map_err(BoxedError::new)
1190    }
1191
1192    async fn sync_region(
1193        &self,
1194        region_id: RegionId,
1195        manifest_info: RegionManifestInfo,
1196    ) -> Result<SyncManifestResponse, BoxedError> {
1197        let (_, synced) = self
1198            .inner
1199            .sync_region(region_id, manifest_info)
1200            .await
1201            .map_err(BoxedError::new)?;
1202
1203        Ok(SyncManifestResponse::Mito { synced })
1204    }
1205
1206    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1207        self.inner.role(region_id)
1208    }
1209
1210    fn as_any(&self) -> &dyn Any {
1211        self
1212    }
1213}
1214
1215impl MitoEngine {
1216    fn handle_alter_response(
1217        &self,
1218        region_id: RegionId,
1219        response: &mut RegionResponse,
1220    ) -> Result<()> {
1221        if let Some(statistic) = self.region_statistic(region_id) {
1222            Self::encode_manifest_info_to_extensions(
1223                &region_id,
1224                statistic.manifest,
1225                &mut response.extensions,
1226            )?;
1227        }
1228        let column_metadatas = self
1229            .inner
1230            .find_region(region_id)
1231            .ok()
1232            .map(|r| r.metadata().column_metadatas.clone());
1233        if let Some(column_metadatas) = column_metadatas {
1234            Self::encode_column_metadatas_to_extensions(
1235                &region_id,
1236                column_metadatas,
1237                &mut response.extensions,
1238            )?;
1239        }
1240        Ok(())
1241    }
1242
1243    fn handle_create_response(
1244        &self,
1245        region_id: RegionId,
1246        response: &mut RegionResponse,
1247    ) -> Result<()> {
1248        let column_metadatas = self
1249            .inner
1250            .find_region(region_id)
1251            .ok()
1252            .map(|r| r.metadata().column_metadatas.clone());
1253        if let Some(column_metadatas) = column_metadatas {
1254            Self::encode_column_metadatas_to_extensions(
1255                &region_id,
1256                column_metadatas,
1257                &mut response.extensions,
1258            )?;
1259        }
1260        Ok(())
1261    }
1262}
1263
1264// Tests methods.
1265#[cfg(any(test, feature = "test"))]
1266#[allow(clippy::too_many_arguments)]
1267impl MitoEngine {
1268    /// Returns a new [MitoEngine] for tests.
1269    pub async fn new_for_test<S: LogStore>(
1270        data_home: &str,
1271        mut config: MitoConfig,
1272        log_store: Arc<S>,
1273        object_store_manager: ObjectStoreManagerRef,
1274        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1275        listener: Option<crate::engine::listener::EventListenerRef>,
1276        time_provider: crate::time_provider::TimeProviderRef,
1277        schema_metadata_manager: SchemaMetadataManagerRef,
1278        file_ref_manager: FileReferenceManagerRef,
1279        partition_expr_fetcher: PartitionExprFetcherRef,
1280    ) -> Result<MitoEngine> {
1281        config.sanitize(data_home)?;
1282
1283        let config = Arc::new(config);
1284        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1285        let total_memory = get_total_memory_bytes().max(0) as u64;
1286        let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1287        let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
1288            .with_on_update(|usage| {
1289                SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1290            })
1291            .with_on_reject(|| {
1292                SCAN_REQUESTS_REJECTED_TOTAL.inc();
1293            });
1294        Ok(MitoEngine {
1295            inner: Arc::new(EngineInner {
1296                workers: WorkerGroup::start_for_test(
1297                    config.clone(),
1298                    log_store,
1299                    object_store_manager,
1300                    write_buffer_manager,
1301                    listener,
1302                    schema_metadata_manager,
1303                    file_ref_manager,
1304                    time_provider,
1305                    partition_expr_fetcher,
1306                )
1307                .await?,
1308                config,
1309                wal_raw_entry_reader,
1310                scan_memory_tracker,
1311                #[cfg(feature = "enterprise")]
1312                extension_range_provider_factory: None,
1313            }),
1314        })
1315    }
1316
1317    /// Returns the purge scheduler.
1318    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1319        self.inner.workers.purge_scheduler()
1320    }
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325    use std::time::Duration;
1326
1327    use super::*;
1328    use crate::sst::file::FileMeta;
1329
1330    #[test]
1331    fn test_is_valid_region_edit() {
1332        // Valid: has only "files_to_add"
1333        let edit = RegionEdit {
1334            files_to_add: vec![FileMeta::default()],
1335            files_to_remove: vec![],
1336            timestamp_ms: None,
1337            compaction_time_window: None,
1338            flushed_entry_id: None,
1339            flushed_sequence: None,
1340            committed_sequence: None,
1341        };
1342        assert!(is_valid_region_edit(&edit));
1343
1344        // Invalid: "files_to_add" is empty
1345        let edit = RegionEdit {
1346            files_to_add: vec![],
1347            files_to_remove: vec![],
1348            timestamp_ms: None,
1349            compaction_time_window: None,
1350            flushed_entry_id: None,
1351            flushed_sequence: None,
1352            committed_sequence: None,
1353        };
1354        assert!(!is_valid_region_edit(&edit));
1355
1356        // Invalid: "files_to_remove" is not empty
1357        let edit = RegionEdit {
1358            files_to_add: vec![FileMeta::default()],
1359            files_to_remove: vec![FileMeta::default()],
1360            timestamp_ms: None,
1361            compaction_time_window: None,
1362            flushed_entry_id: None,
1363            flushed_sequence: None,
1364            committed_sequence: None,
1365        };
1366        assert!(!is_valid_region_edit(&edit));
1367
1368        // Invalid: other fields are not all "None"s
1369        let edit = RegionEdit {
1370            files_to_add: vec![FileMeta::default()],
1371            files_to_remove: vec![],
1372            timestamp_ms: None,
1373            compaction_time_window: Some(Duration::from_secs(1)),
1374            flushed_entry_id: None,
1375            flushed_sequence: None,
1376            committed_sequence: None,
1377        };
1378        assert!(!is_valid_region_edit(&edit));
1379        let edit = RegionEdit {
1380            files_to_add: vec![FileMeta::default()],
1381            files_to_remove: vec![],
1382            timestamp_ms: None,
1383            compaction_time_window: None,
1384            flushed_entry_id: Some(1),
1385            flushed_sequence: None,
1386            committed_sequence: None,
1387        };
1388        assert!(!is_valid_region_edit(&edit));
1389        let edit = RegionEdit {
1390            files_to_add: vec![FileMeta::default()],
1391            files_to_remove: vec![],
1392            timestamp_ms: None,
1393            compaction_time_window: None,
1394            flushed_entry_id: None,
1395            flushed_sequence: Some(1),
1396            committed_sequence: None,
1397        };
1398        assert!(!is_valid_region_edit(&edit));
1399    }
1400}