1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod skip_wal_test;
69#[cfg(test)]
70mod staging_test;
71#[cfg(test)]
72mod sync_test;
73#[cfg(test)]
74mod truncate_test;
75
76#[cfg(test)]
77mod copy_region_from_test;
78#[cfg(test)]
79mod remap_manifests_test;
80
81#[cfg(test)]
82mod apply_staging_manifest_test;
83#[cfg(test)]
84mod partition_filter_test;
85mod puffin_index;
86
87use std::any::Any;
88use std::collections::{HashMap, HashSet};
89use std::sync::Arc;
90use std::time::Instant;
91
92use api::region::RegionResponse;
93use async_trait::async_trait;
94use common_base::Plugins;
95use common_error::ext::BoxedError;
96use common_meta::error::UnexpectedSnafu;
97use common_meta::key::SchemaMetadataManagerRef;
98use common_recordbatch::{QueryMemoryTracker, SendableRecordBatchStream};
99use common_stat::get_total_memory_bytes;
100use common_telemetry::{info, tracing, warn};
101use common_wal::options::WalOptions;
102use futures::future::{join_all, try_join_all};
103use futures::stream::{self, Stream, StreamExt};
104use object_store::manager::ObjectStoreManagerRef;
105use snafu::{OptionExt, ResultExt, ensure};
106use store_api::ManifestVersion;
107use store_api::codec::PrimaryKeyEncoding;
108use store_api::logstore::LogStore;
109use store_api::logstore::provider::{KafkaProvider, Provider};
110use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
111use store_api::metric_engine_consts::{
112 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
113};
114use store_api::region_engine::{
115 BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
116 RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
117 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
118 SyncRegionFromRequest, SyncRegionFromResponse,
119};
120use store_api::region_request::{
121 AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
122};
123use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
124use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
125use tokio::sync::{Semaphore, oneshot};
126
127use crate::access_layer::RegionFilePathFactory;
128use crate::cache::{CacheManagerRef, CacheStrategy};
129use crate::config::MitoConfig;
130use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
131use crate::error::{
132 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
133 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
134};
135#[cfg(feature = "enterprise")]
136use crate::extension::BoxedExtensionRangeProviderFactory;
137use crate::gc::GcLimiterRef;
138use crate::manifest::action::RegionEdit;
139use crate::memtable::MemtableStats;
140use crate::metrics::{
141 HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_EXHAUSTED_TOTAL, SCAN_MEMORY_USAGE_BYTES,
142 SCAN_REQUESTS_REJECTED_TOTAL,
143};
144use crate::read::scan_region::{ScanRegion, Scanner};
145use crate::read::stream::ScanBatchStream;
146use crate::region::MitoRegionRef;
147use crate::region::opener::PartitionExprFetcherRef;
148use crate::region::options::parse_wal_options;
149use crate::request::{RegionEditRequest, WorkerRequest};
150use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
151use crate::sst::file_ref::FileReferenceManagerRef;
152use crate::sst::index::intermediate::IntermediateManager;
153use crate::sst::index::puffin_manager::PuffinManagerFactory;
154use crate::wal::entry_distributor::{
155 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
156};
157use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
158use crate::worker::WorkerGroup;
159
160pub const MITO_ENGINE_NAME: &str = "mito";
161
162pub struct MitoEngineBuilder<'a, S: LogStore> {
163 data_home: &'a str,
164 config: MitoConfig,
165 log_store: Arc<S>,
166 object_store_manager: ObjectStoreManagerRef,
167 schema_metadata_manager: SchemaMetadataManagerRef,
168 file_ref_manager: FileReferenceManagerRef,
169 partition_expr_fetcher: PartitionExprFetcherRef,
170 plugins: Plugins,
171 #[cfg(feature = "enterprise")]
172 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
173}
174
175impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
176 #[allow(clippy::too_many_arguments)]
177 pub fn new(
178 data_home: &'a str,
179 config: MitoConfig,
180 log_store: Arc<S>,
181 object_store_manager: ObjectStoreManagerRef,
182 schema_metadata_manager: SchemaMetadataManagerRef,
183 file_ref_manager: FileReferenceManagerRef,
184 partition_expr_fetcher: PartitionExprFetcherRef,
185 plugins: Plugins,
186 ) -> Self {
187 Self {
188 data_home,
189 config,
190 log_store,
191 object_store_manager,
192 schema_metadata_manager,
193 file_ref_manager,
194 plugins,
195 partition_expr_fetcher,
196 #[cfg(feature = "enterprise")]
197 extension_range_provider_factory: None,
198 }
199 }
200
201 #[cfg(feature = "enterprise")]
202 #[must_use]
203 pub fn with_extension_range_provider_factory(
204 self,
205 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
206 ) -> Self {
207 Self {
208 extension_range_provider_factory,
209 ..self
210 }
211 }
212
213 pub async fn try_build(mut self) -> Result<MitoEngine> {
214 self.config.sanitize(self.data_home)?;
215
216 let config = Arc::new(self.config);
217 let workers = WorkerGroup::start(
218 config.clone(),
219 self.log_store.clone(),
220 self.object_store_manager,
221 self.schema_metadata_manager,
222 self.file_ref_manager,
223 self.partition_expr_fetcher.clone(),
224 self.plugins,
225 )
226 .await?;
227 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
228 let total_memory = get_total_memory_bytes().max(0) as u64;
229 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
230 let scan_memory_tracker =
231 QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
232 .on_update(|usage| {
233 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
234 })
235 .on_exhausted(|| {
236 SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
237 })
238 .on_reject(|| {
239 SCAN_REQUESTS_REJECTED_TOTAL.inc();
240 })
241 .build();
242
243 let inner = EngineInner {
244 workers,
245 config,
246 wal_raw_entry_reader,
247 scan_memory_tracker,
248 #[cfg(feature = "enterprise")]
249 extension_range_provider_factory: None,
250 };
251
252 #[cfg(feature = "enterprise")]
253 let inner =
254 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
255
256 Ok(MitoEngine {
257 inner: Arc::new(inner),
258 })
259 }
260}
261
262#[derive(Clone)]
264pub struct MitoEngine {
265 inner: Arc<EngineInner>,
266}
267
268impl MitoEngine {
269 #[allow(clippy::too_many_arguments)]
271 pub async fn new<S: LogStore>(
272 data_home: &str,
273 config: MitoConfig,
274 log_store: Arc<S>,
275 object_store_manager: ObjectStoreManagerRef,
276 schema_metadata_manager: SchemaMetadataManagerRef,
277 file_ref_manager: FileReferenceManagerRef,
278 partition_expr_fetcher: PartitionExprFetcherRef,
279 plugins: Plugins,
280 ) -> Result<MitoEngine> {
281 let builder = MitoEngineBuilder::new(
282 data_home,
283 config,
284 log_store,
285 object_store_manager,
286 schema_metadata_manager,
287 file_ref_manager,
288 partition_expr_fetcher,
289 plugins,
290 );
291 builder.try_build().await
292 }
293
294 pub fn mito_config(&self) -> &MitoConfig {
295 &self.inner.config
296 }
297
298 pub fn cache_manager(&self) -> CacheManagerRef {
299 self.inner.workers.cache_manager()
300 }
301
302 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
303 self.inner.workers.file_ref_manager()
304 }
305
306 pub fn gc_limiter(&self) -> GcLimiterRef {
307 self.inner.workers.gc_limiter()
308 }
309
310 pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
311 self.inner.workers.object_store_manager()
312 }
313
314 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
315 self.inner.workers.puffin_manager_factory()
316 }
317
318 pub fn intermediate_manager(&self) -> &IntermediateManager {
319 self.inner.workers.intermediate_manager()
320 }
321
322 pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
323 self.inner.workers.schema_metadata_manager()
324 }
325
326 pub async fn get_snapshot_of_file_refs(
328 &self,
329 file_handle_regions: impl IntoIterator<Item = RegionId>,
330 related_regions: HashMap<RegionId, HashSet<RegionId>>,
331 ) -> Result<FileRefsManifest> {
332 let file_ref_mgr = self.file_ref_manager();
333
334 let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
335 let query_regions: Vec<MitoRegionRef> = file_handle_regions
338 .into_iter()
339 .filter_map(|region_id| self.find_region(region_id))
340 .collect();
341
342 let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
343 let dst2src = related_regions
344 .into_iter()
345 .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
346 .fold(
347 HashMap::<RegionId, HashSet<RegionId>>::new(),
348 |mut acc, (k, v)| {
349 let entry = acc.entry(k).or_default();
350 entry.insert(v);
351 acc
352 },
353 );
354 let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
355 for (dst_region, srcs) in dst2src {
356 let Some(dst_region) = self.find_region(dst_region) else {
357 continue;
358 };
359 dst_region_to_src_regions.push((dst_region, srcs));
360 }
361 dst_region_to_src_regions
362 };
363
364 file_ref_mgr
365 .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
366 .await
367 }
368
369 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
371 self.inner.workers.is_region_exists(region_id)
372 }
373
374 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
376 self.inner.workers.is_region_opening(region_id)
377 }
378
379 pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
381 self.inner.workers.is_region_catching_up(region_id)
382 }
383
384 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
386 self.find_region(region_id)
387 .map(|region| region.region_statistic())
388 }
389
390 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
392 self.find_region(region_id)
393 .map(|r| r.primary_key_encoding())
394 }
395
396 #[tracing::instrument(skip_all)]
401 pub async fn scan_to_stream(
402 &self,
403 region_id: RegionId,
404 request: ScanRequest,
405 ) -> Result<SendableRecordBatchStream, BoxedError> {
406 self.scanner(region_id, request)
407 .await
408 .map_err(BoxedError::new)?
409 .scan()
410 .await
411 }
412
413 pub async fn scan_batch(
415 &self,
416 region_id: RegionId,
417 request: ScanRequest,
418 filter_deleted: bool,
419 ) -> Result<ScanBatchStream> {
420 let mut scan_region = self.scan_region(region_id, request)?;
421 scan_region.set_filter_deleted(filter_deleted);
422 scan_region.scanner().await?.scan_batch()
423 }
424
425 pub(crate) async fn scanner(
427 &self,
428 region_id: RegionId,
429 request: ScanRequest,
430 ) -> Result<Scanner> {
431 self.scan_region(region_id, request)?.scanner().await
432 }
433
434 #[tracing::instrument(skip_all, fields(region_id = %region_id))]
436 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
437 self.inner.scan_region(region_id, request)
438 }
439
440 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
445 let _timer = HANDLE_REQUEST_ELAPSED
446 .with_label_values(&["edit_region"])
447 .start_timer();
448
449 ensure!(
450 is_valid_region_edit(&edit),
451 InvalidRequestSnafu {
452 region_id,
453 reason: "invalid region edit"
454 }
455 );
456
457 let (tx, rx) = oneshot::channel();
458 let request = WorkerRequest::EditRegion(RegionEditRequest {
459 region_id,
460 edit,
461 tx,
462 });
463 self.inner
464 .workers
465 .submit_to_worker(region_id, request)
466 .await?;
467 rx.await.context(RecvSnafu)?
468 }
469
470 pub async fn copy_region_from(
474 &self,
475 region_id: RegionId,
476 request: MitoCopyRegionFromRequest,
477 ) -> Result<MitoCopyRegionFromResponse> {
478 self.inner.copy_region_from(region_id, request).await
479 }
480
481 #[cfg(test)]
482 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
483 self.find_region(id)
484 }
485
486 pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
487 self.inner.workers.get_region(region_id)
488 }
489
490 pub fn regions(&self) -> Vec<MitoRegionRef> {
492 self.inner.workers.all_regions().collect()
493 }
494
495 fn encode_manifest_info_to_extensions(
496 region_id: &RegionId,
497 manifest_info: RegionManifestInfo,
498 extensions: &mut HashMap<String, Vec<u8>>,
499 ) -> Result<()> {
500 let region_manifest_info = vec![(*region_id, manifest_info)];
501
502 extensions.insert(
503 MANIFEST_INFO_EXTENSION_KEY.to_string(),
504 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
505 );
506 info!(
507 "Added manifest info: {:?} to extensions, region_id: {:?}",
508 region_manifest_info, region_id
509 );
510 Ok(())
511 }
512
513 fn encode_column_metadatas_to_extensions(
514 region_id: &RegionId,
515 column_metadatas: Vec<ColumnMetadata>,
516 extensions: &mut HashMap<String, Vec<u8>>,
517 ) -> Result<()> {
518 extensions.insert(
519 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
520 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
521 );
522 info!(
523 "Added column metadatas: {:?} to extensions, region_id: {:?}",
524 column_metadatas, region_id
525 );
526 Ok(())
527 }
528
529 pub fn find_memtable_and_sst_stats(
532 &self,
533 region_id: RegionId,
534 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
535 let region = self
536 .find_region(region_id)
537 .context(RegionNotFoundSnafu { region_id })?;
538
539 let version = region.version();
540 let memtable_stats = version
541 .memtables
542 .list_memtables()
543 .iter()
544 .map(|x| x.stats())
545 .collect::<Vec<_>>();
546
547 let sst_stats = version
548 .ssts
549 .levels()
550 .iter()
551 .flat_map(|level| level.files().map(|x| x.meta_ref()))
552 .cloned()
553 .collect::<Vec<_>>();
554 Ok((memtable_stats, sst_stats))
555 }
556
557 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
559 let node_id = self.inner.workers.file_ref_manager().node_id();
560 let regions = self.inner.workers.all_regions();
561
562 let mut results = Vec::new();
563 for region in regions {
564 let mut entries = region.manifest_sst_entries().await;
565 for e in &mut entries {
566 e.node_id = node_id;
567 }
568 results.extend(entries);
569 }
570
571 results
572 }
573
574 pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
576 let node_id = self.inner.workers.file_ref_manager().node_id();
577 let cache_manager = self.inner.workers.cache_manager();
578 let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
579 let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
580 let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
581
582 let mut results = Vec::new();
583
584 for region in self.inner.workers.all_regions() {
585 let manifest_entries = region.manifest_sst_entries().await;
586 let access_layer = region.access_layer.clone();
587 let table_dir = access_layer.table_dir().to_string();
588 let path_type = access_layer.path_type();
589 let object_store = access_layer.object_store().clone();
590 let puffin_factory = access_layer.puffin_manager_factory().clone();
591 let path_factory = RegionFilePathFactory::new(table_dir, path_type);
592
593 let entry_futures = manifest_entries.into_iter().map(|entry| {
594 let object_store = object_store.clone();
595 let path_factory = path_factory.clone();
596 let puffin_factory = puffin_factory.clone();
597 let puffin_metadata_cache = puffin_metadata_cache.clone();
598 let bloom_filter_cache = bloom_filter_cache.clone();
599 let inverted_index_cache = inverted_index_cache.clone();
600
601 async move {
602 let Some(index_file_path) = entry.index_file_path.as_ref() else {
603 return Vec::new();
604 };
605
606 let index_version = entry.index_version;
607 let file_id = match FileId::parse_str(&entry.file_id) {
608 Ok(file_id) => file_id,
609 Err(err) => {
610 warn!(
611 err;
612 "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
613 entry.table_dir,
614 entry.file_id
615 );
616 return Vec::new();
617 }
618 };
619 let region_index_id = RegionIndexId::new(
620 RegionFileId::new(entry.region_id, file_id),
621 index_version,
622 );
623 let context = IndexEntryContext {
624 table_dir: &entry.table_dir,
625 index_file_path: index_file_path.as_str(),
626 region_id: entry.region_id,
627 table_id: entry.table_id,
628 region_number: entry.region_number,
629 region_group: entry.region_group,
630 region_sequence: entry.region_sequence,
631 file_id: &entry.file_id,
632 index_file_size: entry.index_file_size,
633 node_id,
634 };
635
636 let manager = puffin_factory
637 .build(object_store, path_factory)
638 .with_puffin_metadata_cache(puffin_metadata_cache);
639
640 collect_index_entries_from_puffin(
641 manager,
642 region_index_id,
643 context,
644 bloom_filter_cache,
645 inverted_index_cache,
646 )
647 .await
648 }
649 });
650
651 let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); while let Some(mut metas) = meta_stream.next().await {
653 results.append(&mut metas);
654 }
655 }
656
657 results
658 }
659
660 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
662 let node_id = self.inner.workers.file_ref_manager().node_id();
663 let regions = self.inner.workers.all_regions();
664
665 let mut layers_distinct_table_dirs = HashMap::new();
666 for region in regions {
667 let table_dir = region.access_layer.table_dir();
668 if !layers_distinct_table_dirs.contains_key(table_dir) {
669 layers_distinct_table_dirs
670 .insert(table_dir.to_string(), region.access_layer.clone());
671 }
672 }
673
674 stream::iter(layers_distinct_table_dirs)
675 .map(|(_, access_layer)| access_layer.storage_sst_entries())
676 .flatten()
677 .map(move |entry| {
678 entry.map(move |mut entry| {
679 entry.node_id = node_id;
680 entry
681 })
682 })
683 }
684}
685
686fn is_valid_region_edit(edit: &RegionEdit) -> bool {
690 (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
691 && matches!(
692 edit,
693 RegionEdit {
694 files_to_add: _,
695 files_to_remove: _,
696 timestamp_ms: _,
697 compaction_time_window: None,
698 flushed_entry_id: None,
699 flushed_sequence: None,
700 ..
701 }
702 )
703}
704
705struct EngineInner {
707 workers: WorkerGroup,
709 config: Arc<MitoConfig>,
711 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
713 scan_memory_tracker: QueryMemoryTracker,
715 #[cfg(feature = "enterprise")]
716 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
717}
718
719type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
720
721fn prepare_batch_open_requests(
723 requests: Vec<(RegionId, RegionOpenRequest)>,
724) -> Result<(
725 TopicGroupedRegionOpenRequests,
726 Vec<(RegionId, RegionOpenRequest)>,
727)> {
728 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
729 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
730 for (region_id, request) in requests {
731 match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
732 WalOptions::Kafka(options) => {
733 topic_to_regions
734 .entry(options.topic)
735 .or_default()
736 .push((region_id, request));
737 }
738 WalOptions::RaftEngine | WalOptions::Noop => {
739 remaining_regions.push((region_id, request));
740 }
741 }
742 }
743
744 Ok((topic_to_regions, remaining_regions))
745}
746
747impl EngineInner {
748 #[cfg(feature = "enterprise")]
749 #[must_use]
750 fn with_extension_range_provider_factory(
751 self,
752 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
753 ) -> Self {
754 Self {
755 extension_range_provider_factory,
756 ..self
757 }
758 }
759
760 async fn stop(&self) -> Result<()> {
762 self.workers.stop().await
763 }
764
765 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
766 self.workers
767 .get_region(region_id)
768 .context(RegionNotFoundSnafu { region_id })
769 }
770
771 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
775 let region = self.find_region(region_id)?;
777 Ok(region.metadata())
778 }
779
780 async fn open_topic_regions(
781 &self,
782 topic: String,
783 region_requests: Vec<(RegionId, RegionOpenRequest)>,
784 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
785 let now = Instant::now();
786 let region_ids = region_requests
787 .iter()
788 .map(|(region_id, _)| *region_id)
789 .collect::<Vec<_>>();
790 let provider = Provider::kafka_provider(topic);
791 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
792 provider.clone(),
793 self.wal_raw_entry_reader.clone(),
794 ®ion_ids,
795 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
796 );
797
798 let mut responses = Vec::with_capacity(region_requests.len());
799 for ((region_id, request), entry_receiver) in
800 region_requests.into_iter().zip(entry_receivers)
801 {
802 let (request, receiver) =
803 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
804 self.workers.submit_to_worker(region_id, request).await?;
805 responses.push(async move { receiver.await.context(RecvSnafu)? });
806 }
807
808 let distribution =
810 common_runtime::spawn_global(async move { distributor.distribute().await });
811 let responses = join_all(responses).await;
813 distribution.await.context(JoinSnafu)??;
814
815 let num_failure = responses.iter().filter(|r| r.is_err()).count();
816 info!(
817 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
818 region_ids.len() - num_failure,
819 provider.as_kafka_provider().unwrap(),
821 num_failure,
822 now.elapsed(),
823 );
824 Ok(region_ids.into_iter().zip(responses).collect())
825 }
826
827 async fn handle_batch_open_requests(
828 &self,
829 parallelism: usize,
830 requests: Vec<(RegionId, RegionOpenRequest)>,
831 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
832 let semaphore = Arc::new(Semaphore::new(parallelism));
833 let (topic_to_region_requests, remaining_region_requests) =
834 prepare_batch_open_requests(requests)?;
835 let mut responses =
836 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
837
838 if !topic_to_region_requests.is_empty() {
839 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
840 for (topic, region_requests) in topic_to_region_requests {
841 let semaphore_moved = semaphore.clone();
842 tasks.push(async move {
843 let _permit = semaphore_moved.acquire().await.unwrap();
845 self.open_topic_regions(topic, region_requests).await
846 })
847 }
848 let r = try_join_all(tasks).await?;
849 responses.extend(r.into_iter().flatten());
850 }
851
852 if !remaining_region_requests.is_empty() {
853 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
854 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
855 for (region_id, request) in remaining_region_requests {
856 let semaphore_moved = semaphore.clone();
857 region_ids.push(region_id);
858 tasks.push(async move {
859 let _permit = semaphore_moved.acquire().await.unwrap();
861 let (request, receiver) =
862 WorkerRequest::new_open_region_request(region_id, request, None);
863
864 self.workers.submit_to_worker(region_id, request).await?;
865
866 receiver.await.context(RecvSnafu)?
867 })
868 }
869
870 let results = join_all(tasks).await;
871 responses.extend(region_ids.into_iter().zip(results));
872 }
873
874 Ok(responses)
875 }
876
877 async fn catchup_topic_regions(
878 &self,
879 provider: Provider,
880 region_requests: Vec<(RegionId, RegionCatchupRequest)>,
881 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
882 let now = Instant::now();
883 let region_ids = region_requests
884 .iter()
885 .map(|(region_id, _)| *region_id)
886 .collect::<Vec<_>>();
887 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
888 provider.clone(),
889 self.wal_raw_entry_reader.clone(),
890 ®ion_ids,
891 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
892 );
893
894 let mut responses = Vec::with_capacity(region_requests.len());
895 for ((region_id, request), entry_receiver) in
896 region_requests.into_iter().zip(entry_receivers)
897 {
898 let (request, receiver) =
899 WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
900 self.workers.submit_to_worker(region_id, request).await?;
901 responses.push(async move { receiver.await.context(RecvSnafu)? });
902 }
903
904 let distribution =
906 common_runtime::spawn_global(async move { distributor.distribute().await });
907 let responses = join_all(responses).await;
909 distribution.await.context(JoinSnafu)??;
910
911 let num_failure = responses.iter().filter(|r| r.is_err()).count();
912 info!(
913 "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
914 region_ids.len() - num_failure,
915 provider.as_kafka_provider().unwrap(),
917 num_failure,
918 now.elapsed(),
919 );
920
921 Ok(region_ids.into_iter().zip(responses).collect())
922 }
923
924 async fn handle_batch_catchup_requests(
925 &self,
926 parallelism: usize,
927 requests: Vec<(RegionId, RegionCatchupRequest)>,
928 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
929 let mut responses = Vec::with_capacity(requests.len());
930 let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
931 let mut remaining_region_requests = vec![];
932
933 for (region_id, request) in requests {
934 match self.workers.get_region(region_id) {
935 Some(region) => match region.provider.as_kafka_provider() {
936 Some(provider) => {
937 topic_regions
938 .entry(provider.clone())
939 .or_default()
940 .push((region_id, request));
941 }
942 None => {
943 remaining_region_requests.push((region_id, request));
944 }
945 },
946 None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
947 }
948 }
949
950 let semaphore = Arc::new(Semaphore::new(parallelism));
951
952 if !topic_regions.is_empty() {
953 let mut tasks = Vec::with_capacity(topic_regions.len());
954 for (provider, region_requests) in topic_regions {
955 let semaphore_moved = semaphore.clone();
956 tasks.push(async move {
957 let _permit = semaphore_moved.acquire().await.unwrap();
959 self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
960 .await
961 })
962 }
963
964 let r = try_join_all(tasks).await?;
965 responses.extend(r.into_iter().flatten());
966 }
967
968 if !remaining_region_requests.is_empty() {
969 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
970 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
971 for (region_id, request) in remaining_region_requests {
972 let semaphore_moved = semaphore.clone();
973 region_ids.push(region_id);
974 tasks.push(async move {
975 let _permit = semaphore_moved.acquire().await.unwrap();
977 let (request, receiver) =
978 WorkerRequest::new_catchup_region_request(region_id, request, None);
979
980 self.workers.submit_to_worker(region_id, request).await?;
981
982 receiver.await.context(RecvSnafu)?
983 })
984 }
985
986 let results = join_all(tasks).await;
987 responses.extend(region_ids.into_iter().zip(results));
988 }
989
990 Ok(responses)
991 }
992
993 async fn handle_request(
995 &self,
996 region_id: RegionId,
997 request: RegionRequest,
998 ) -> Result<AffectedRows> {
999 let region_metadata = self.get_metadata(region_id).ok();
1000 let (request, receiver) =
1001 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
1002 self.workers.submit_to_worker(region_id, request).await?;
1003
1004 receiver.await.context(RecvSnafu)?
1005 }
1006
1007 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
1009 self.find_region(region_id)
1011 .map(|r| r.find_committed_sequence())
1012 }
1013
1014 #[tracing::instrument(skip_all, fields(region_id = %region_id))]
1016 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
1017 let query_start = Instant::now();
1018 let region = self.find_region(region_id)?;
1020 let version = region.version();
1021 let cache_manager = self.workers.cache_manager();
1023
1024 let scan_region = ScanRegion::new(
1025 version,
1026 region.access_layer.clone(),
1027 request,
1028 CacheStrategy::EnableAll(cache_manager),
1029 )
1030 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
1031 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
1032 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
1033 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
1034 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
1035 .with_start_time(query_start);
1036
1037 #[cfg(feature = "enterprise")]
1038 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
1039
1040 Ok(scan_region)
1041 }
1042
1043 #[cfg(feature = "enterprise")]
1044 fn maybe_fill_extension_range_provider(
1045 &self,
1046 mut scan_region: ScanRegion,
1047 region: MitoRegionRef,
1048 ) -> ScanRegion {
1049 if region.is_follower()
1050 && let Some(factory) = self.extension_range_provider_factory.as_ref()
1051 {
1052 scan_region
1053 .set_extension_range_provider(factory.create_extension_range_provider(region));
1054 }
1055 scan_region
1056 }
1057
1058 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1060 let region = self.find_region(region_id)?;
1061 region.set_role(role);
1062 Ok(())
1063 }
1064
1065 async fn set_region_role_state_gracefully(
1067 &self,
1068 region_id: RegionId,
1069 region_role_state: SettableRegionRoleState,
1070 ) -> Result<SetRegionRoleStateResponse> {
1071 let (request, receiver) =
1074 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1075 self.workers.submit_to_worker(region_id, request).await?;
1076
1077 receiver.await.context(RecvSnafu)
1078 }
1079
1080 async fn sync_region(
1081 &self,
1082 region_id: RegionId,
1083 manifest_info: RegionManifestInfo,
1084 ) -> Result<(ManifestVersion, bool)> {
1085 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1086 let manifest_version = manifest_info.data_manifest_version();
1087 let (request, receiver) =
1088 WorkerRequest::new_sync_region_request(region_id, manifest_version);
1089 self.workers.submit_to_worker(region_id, request).await?;
1090
1091 receiver.await.context(RecvSnafu)?
1092 }
1093
1094 async fn remap_manifests(
1095 &self,
1096 request: RemapManifestsRequest,
1097 ) -> Result<RemapManifestsResponse> {
1098 let region_id = request.region_id;
1099 let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1100 self.workers.submit_to_worker(region_id, request).await?;
1101 let manifest_paths = receiver.await.context(RecvSnafu)??;
1102 Ok(RemapManifestsResponse { manifest_paths })
1103 }
1104
1105 async fn copy_region_from(
1106 &self,
1107 region_id: RegionId,
1108 request: MitoCopyRegionFromRequest,
1109 ) -> Result<MitoCopyRegionFromResponse> {
1110 let (request, receiver) =
1111 WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
1112 self.workers.submit_to_worker(region_id, request).await?;
1113 let response = receiver.await.context(RecvSnafu)??;
1114 Ok(response)
1115 }
1116
1117 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1118 self.workers.get_region(region_id).map(|region| {
1119 if region.is_follower() {
1120 RegionRole::Follower
1121 } else {
1122 RegionRole::Leader
1123 }
1124 })
1125 }
1126}
1127
1128fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
1129 responses
1130 .into_iter()
1131 .map(|(region_id, response)| {
1132 (
1133 region_id,
1134 response.map(RegionResponse::new).map_err(BoxedError::new),
1135 )
1136 })
1137 .collect()
1138}
1139
1140#[async_trait]
1141impl RegionEngine for MitoEngine {
1142 fn name(&self) -> &str {
1143 MITO_ENGINE_NAME
1144 }
1145
1146 #[tracing::instrument(skip_all)]
1147 async fn handle_batch_open_requests(
1148 &self,
1149 parallelism: usize,
1150 requests: Vec<(RegionId, RegionOpenRequest)>,
1151 ) -> Result<BatchResponses, BoxedError> {
1152 self.inner
1154 .handle_batch_open_requests(parallelism, requests)
1155 .await
1156 .map(map_batch_responses)
1157 .map_err(BoxedError::new)
1158 }
1159
1160 #[tracing::instrument(skip_all)]
1161 async fn handle_batch_catchup_requests(
1162 &self,
1163 parallelism: usize,
1164 requests: Vec<(RegionId, RegionCatchupRequest)>,
1165 ) -> Result<BatchResponses, BoxedError> {
1166 self.inner
1167 .handle_batch_catchup_requests(parallelism, requests)
1168 .await
1169 .map(map_batch_responses)
1170 .map_err(BoxedError::new)
1171 }
1172
1173 #[tracing::instrument(skip_all)]
1174 async fn handle_request(
1175 &self,
1176 region_id: RegionId,
1177 request: RegionRequest,
1178 ) -> Result<RegionResponse, BoxedError> {
1179 let _timer = HANDLE_REQUEST_ELAPSED
1180 .with_label_values(&[request.request_type()])
1181 .start_timer();
1182
1183 let is_alter = matches!(request, RegionRequest::Alter(_));
1184 let is_create = matches!(request, RegionRequest::Create(_));
1185 let mut response = self
1186 .inner
1187 .handle_request(region_id, request)
1188 .await
1189 .map(RegionResponse::new)
1190 .map_err(BoxedError::new)?;
1191
1192 if is_alter {
1193 self.handle_alter_response(region_id, &mut response)
1194 .map_err(BoxedError::new)?;
1195 } else if is_create {
1196 self.handle_create_response(region_id, &mut response)
1197 .map_err(BoxedError::new)?;
1198 }
1199
1200 Ok(response)
1201 }
1202
1203 #[tracing::instrument(skip_all)]
1204 async fn handle_query(
1205 &self,
1206 region_id: RegionId,
1207 request: ScanRequest,
1208 ) -> Result<RegionScannerRef, BoxedError> {
1209 self.scan_region(region_id, request)
1210 .map_err(BoxedError::new)?
1211 .region_scanner()
1212 .await
1213 .map_err(BoxedError::new)
1214 }
1215
1216 fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
1217 Some(self.inner.scan_memory_tracker.clone())
1218 }
1219
1220 async fn get_committed_sequence(
1221 &self,
1222 region_id: RegionId,
1223 ) -> Result<SequenceNumber, BoxedError> {
1224 self.inner
1225 .get_committed_sequence(region_id)
1226 .map_err(BoxedError::new)
1227 }
1228
1229 async fn get_metadata(
1231 &self,
1232 region_id: RegionId,
1233 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1234 self.inner.get_metadata(region_id).map_err(BoxedError::new)
1235 }
1236
1237 async fn stop(&self) -> std::result::Result<(), BoxedError> {
1243 self.inner.stop().await.map_err(BoxedError::new)
1244 }
1245
1246 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1247 self.get_region_statistic(region_id)
1248 }
1249
1250 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1251 self.inner
1252 .set_region_role(region_id, role)
1253 .map_err(BoxedError::new)
1254 }
1255
1256 async fn set_region_role_state_gracefully(
1257 &self,
1258 region_id: RegionId,
1259 region_role_state: SettableRegionRoleState,
1260 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1261 let _timer = HANDLE_REQUEST_ELAPSED
1262 .with_label_values(&["set_region_role_state_gracefully"])
1263 .start_timer();
1264
1265 self.inner
1266 .set_region_role_state_gracefully(region_id, region_role_state)
1267 .await
1268 .map_err(BoxedError::new)
1269 }
1270
1271 async fn sync_region(
1272 &self,
1273 region_id: RegionId,
1274 request: SyncRegionFromRequest,
1275 ) -> Result<SyncRegionFromResponse, BoxedError> {
1276 let manifest_info = request
1277 .into_region_manifest_info()
1278 .context(UnexpectedSnafu {
1279 err_msg: "Expected a manifest info request",
1280 })
1281 .map_err(BoxedError::new)?;
1282 let (_, synced) = self
1283 .inner
1284 .sync_region(region_id, manifest_info)
1285 .await
1286 .map_err(BoxedError::new)?;
1287
1288 Ok(SyncRegionFromResponse::Mito { synced })
1289 }
1290
1291 async fn remap_manifests(
1292 &self,
1293 request: RemapManifestsRequest,
1294 ) -> Result<RemapManifestsResponse, BoxedError> {
1295 self.inner
1296 .remap_manifests(request)
1297 .await
1298 .map_err(BoxedError::new)
1299 }
1300
1301 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1302 self.inner.role(region_id)
1303 }
1304
1305 fn as_any(&self) -> &dyn Any {
1306 self
1307 }
1308}
1309
1310impl MitoEngine {
1311 fn handle_alter_response(
1312 &self,
1313 region_id: RegionId,
1314 response: &mut RegionResponse,
1315 ) -> Result<()> {
1316 if let Some(statistic) = self.region_statistic(region_id) {
1317 Self::encode_manifest_info_to_extensions(
1318 ®ion_id,
1319 statistic.manifest,
1320 &mut response.extensions,
1321 )?;
1322 }
1323 let column_metadatas = self
1324 .inner
1325 .find_region(region_id)
1326 .ok()
1327 .map(|r| r.metadata().column_metadatas.clone());
1328 if let Some(column_metadatas) = column_metadatas {
1329 Self::encode_column_metadatas_to_extensions(
1330 ®ion_id,
1331 column_metadatas,
1332 &mut response.extensions,
1333 )?;
1334 }
1335 Ok(())
1336 }
1337
1338 fn handle_create_response(
1339 &self,
1340 region_id: RegionId,
1341 response: &mut RegionResponse,
1342 ) -> Result<()> {
1343 let column_metadatas = self
1344 .inner
1345 .find_region(region_id)
1346 .ok()
1347 .map(|r| r.metadata().column_metadatas.clone());
1348 if let Some(column_metadatas) = column_metadatas {
1349 Self::encode_column_metadatas_to_extensions(
1350 ®ion_id,
1351 column_metadatas,
1352 &mut response.extensions,
1353 )?;
1354 }
1355 Ok(())
1356 }
1357}
1358
1359#[cfg(any(test, feature = "test"))]
1361#[allow(clippy::too_many_arguments)]
1362impl MitoEngine {
1363 pub async fn new_for_test<S: LogStore>(
1365 data_home: &str,
1366 mut config: MitoConfig,
1367 log_store: Arc<S>,
1368 object_store_manager: ObjectStoreManagerRef,
1369 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1370 listener: Option<crate::engine::listener::EventListenerRef>,
1371 time_provider: crate::time_provider::TimeProviderRef,
1372 schema_metadata_manager: SchemaMetadataManagerRef,
1373 file_ref_manager: FileReferenceManagerRef,
1374 partition_expr_fetcher: PartitionExprFetcherRef,
1375 ) -> Result<MitoEngine> {
1376 config.sanitize(data_home)?;
1377
1378 let config = Arc::new(config);
1379 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1380 let total_memory = get_total_memory_bytes().max(0) as u64;
1381 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1382 let scan_memory_tracker =
1383 QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
1384 .on_update(|usage| {
1385 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1386 })
1387 .on_exhausted(|| {
1388 SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
1389 })
1390 .on_reject(|| {
1391 SCAN_REQUESTS_REJECTED_TOTAL.inc();
1392 })
1393 .build();
1394 Ok(MitoEngine {
1395 inner: Arc::new(EngineInner {
1396 workers: WorkerGroup::start_for_test(
1397 config.clone(),
1398 log_store,
1399 object_store_manager,
1400 write_buffer_manager,
1401 listener,
1402 schema_metadata_manager,
1403 file_ref_manager,
1404 time_provider,
1405 partition_expr_fetcher,
1406 )
1407 .await?,
1408 config,
1409 wal_raw_entry_reader,
1410 scan_memory_tracker,
1411 #[cfg(feature = "enterprise")]
1412 extension_range_provider_factory: None,
1413 }),
1414 })
1415 }
1416
1417 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1419 self.inner.workers.purge_scheduler()
1420 }
1421}
1422
1423#[cfg(test)]
1424mod tests {
1425 use std::time::Duration;
1426
1427 use super::*;
1428 use crate::sst::file::FileMeta;
1429
1430 #[test]
1431 fn test_is_valid_region_edit() {
1432 let edit = RegionEdit {
1434 files_to_add: vec![FileMeta::default()],
1435 files_to_remove: vec![],
1436 timestamp_ms: None,
1437 compaction_time_window: None,
1438 flushed_entry_id: None,
1439 flushed_sequence: None,
1440 committed_sequence: None,
1441 };
1442 assert!(is_valid_region_edit(&edit));
1443
1444 let edit = RegionEdit {
1446 files_to_add: vec![],
1447 files_to_remove: vec![],
1448 timestamp_ms: None,
1449 compaction_time_window: None,
1450 flushed_entry_id: None,
1451 flushed_sequence: None,
1452 committed_sequence: None,
1453 };
1454 assert!(!is_valid_region_edit(&edit));
1455
1456 let edit = RegionEdit {
1458 files_to_add: vec![FileMeta::default()],
1459 files_to_remove: vec![FileMeta::default()],
1460 timestamp_ms: None,
1461 compaction_time_window: None,
1462 flushed_entry_id: None,
1463 flushed_sequence: None,
1464 committed_sequence: None,
1465 };
1466 assert!(is_valid_region_edit(&edit));
1467
1468 let edit = RegionEdit {
1470 files_to_add: vec![FileMeta::default()],
1471 files_to_remove: vec![],
1472 timestamp_ms: None,
1473 compaction_time_window: Some(Duration::from_secs(1)),
1474 flushed_entry_id: None,
1475 flushed_sequence: None,
1476 committed_sequence: None,
1477 };
1478 assert!(!is_valid_region_edit(&edit));
1479 let edit = RegionEdit {
1480 files_to_add: vec![FileMeta::default()],
1481 files_to_remove: vec![],
1482 timestamp_ms: None,
1483 compaction_time_window: None,
1484 flushed_entry_id: Some(1),
1485 flushed_sequence: None,
1486 committed_sequence: None,
1487 };
1488 assert!(!is_valid_region_edit(&edit));
1489 let edit = RegionEdit {
1490 files_to_add: vec![FileMeta::default()],
1491 files_to_remove: vec![],
1492 timestamp_ms: None,
1493 compaction_time_window: None,
1494 flushed_entry_id: None,
1495 flushed_sequence: Some(1),
1496 committed_sequence: None,
1497 };
1498 assert!(!is_valid_region_edit(&edit));
1499 }
1500}