1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod skip_wal_test;
69#[cfg(test)]
70mod staging_test;
71#[cfg(test)]
72mod sync_test;
73#[cfg(test)]
74mod truncate_test;
75
76#[cfg(test)]
77mod copy_region_from_test;
78#[cfg(test)]
79mod remap_manifests_test;
80
81#[cfg(test)]
82mod apply_staging_manifest_test;
83#[cfg(test)]
84mod partition_filter_test;
85mod puffin_index;
86
87use std::any::Any;
88use std::collections::{HashMap, HashSet};
89use std::sync::Arc;
90use std::time::Instant;
91
92use api::region::RegionResponse;
93use async_trait::async_trait;
94use common_base::Plugins;
95use common_error::ext::BoxedError;
96use common_meta::error::UnexpectedSnafu;
97use common_meta::key::SchemaMetadataManagerRef;
98use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
99use common_stat::get_total_memory_bytes;
100use common_telemetry::{info, tracing, warn};
101use common_wal::options::WalOptions;
102use futures::future::{join_all, try_join_all};
103use futures::stream::{self, Stream, StreamExt};
104use object_store::manager::ObjectStoreManagerRef;
105use snafu::{OptionExt, ResultExt, ensure};
106use store_api::ManifestVersion;
107use store_api::codec::PrimaryKeyEncoding;
108use store_api::logstore::LogStore;
109use store_api::logstore::provider::{KafkaProvider, Provider};
110use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
111use store_api::metric_engine_consts::{
112 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
113};
114use store_api::region_engine::{
115 BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
116 RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
117 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
118 SyncRegionFromRequest, SyncRegionFromResponse,
119};
120use store_api::region_request::{
121 AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
122};
123use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
124use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
125use tokio::sync::{Semaphore, oneshot};
126
127use crate::access_layer::RegionFilePathFactory;
128use crate::cache::{CacheManagerRef, CacheStrategy};
129use crate::config::MitoConfig;
130use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
131use crate::error::{
132 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
133 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
134};
135#[cfg(feature = "enterprise")]
136use crate::extension::BoxedExtensionRangeProviderFactory;
137use crate::gc::GcLimiterRef;
138use crate::manifest::action::RegionEdit;
139use crate::memtable::MemtableStats;
140use crate::metrics::{
141 HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
142};
143use crate::read::scan_region::{ScanRegion, Scanner};
144use crate::read::stream::ScanBatchStream;
145use crate::region::MitoRegionRef;
146use crate::region::opener::PartitionExprFetcherRef;
147use crate::region::options::parse_wal_options;
148use crate::request::{RegionEditRequest, WorkerRequest};
149use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
150use crate::sst::file_ref::FileReferenceManagerRef;
151use crate::sst::index::intermediate::IntermediateManager;
152use crate::sst::index::puffin_manager::PuffinManagerFactory;
153use crate::wal::entry_distributor::{
154 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
155};
156use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
157use crate::worker::WorkerGroup;
158
159pub const MITO_ENGINE_NAME: &str = "mito";
160
161pub struct MitoEngineBuilder<'a, S: LogStore> {
162 data_home: &'a str,
163 config: MitoConfig,
164 log_store: Arc<S>,
165 object_store_manager: ObjectStoreManagerRef,
166 schema_metadata_manager: SchemaMetadataManagerRef,
167 file_ref_manager: FileReferenceManagerRef,
168 partition_expr_fetcher: PartitionExprFetcherRef,
169 plugins: Plugins,
170 max_concurrent_queries: usize,
171 #[cfg(feature = "enterprise")]
172 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
173}
174
175impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
176 #[allow(clippy::too_many_arguments)]
177 pub fn new(
178 data_home: &'a str,
179 config: MitoConfig,
180 log_store: Arc<S>,
181 object_store_manager: ObjectStoreManagerRef,
182 schema_metadata_manager: SchemaMetadataManagerRef,
183 file_ref_manager: FileReferenceManagerRef,
184 partition_expr_fetcher: PartitionExprFetcherRef,
185 plugins: Plugins,
186 max_concurrent_queries: usize,
187 ) -> Self {
188 Self {
189 data_home,
190 config,
191 log_store,
192 object_store_manager,
193 schema_metadata_manager,
194 file_ref_manager,
195 plugins,
196 partition_expr_fetcher,
197 max_concurrent_queries,
198 #[cfg(feature = "enterprise")]
199 extension_range_provider_factory: None,
200 }
201 }
202
203 #[cfg(feature = "enterprise")]
204 #[must_use]
205 pub fn with_extension_range_provider_factory(
206 self,
207 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
208 ) -> Self {
209 Self {
210 extension_range_provider_factory,
211 ..self
212 }
213 }
214
215 pub async fn try_build(mut self) -> Result<MitoEngine> {
216 self.config.sanitize(self.data_home)?;
217
218 let config = Arc::new(self.config);
219 let workers = WorkerGroup::start(
220 config.clone(),
221 self.log_store.clone(),
222 self.object_store_manager,
223 self.schema_metadata_manager,
224 self.file_ref_manager,
225 self.partition_expr_fetcher.clone(),
226 self.plugins,
227 )
228 .await?;
229 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
230 let total_memory = get_total_memory_bytes().max(0) as u64;
231 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
232 let scan_memory_tracker =
233 QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
234 .with_on_update(|usage| {
235 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
236 })
237 .with_on_reject(|| {
238 SCAN_REQUESTS_REJECTED_TOTAL.inc();
239 });
240
241 let inner = EngineInner {
242 workers,
243 config,
244 wal_raw_entry_reader,
245 scan_memory_tracker,
246 #[cfg(feature = "enterprise")]
247 extension_range_provider_factory: None,
248 };
249
250 #[cfg(feature = "enterprise")]
251 let inner =
252 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
253
254 Ok(MitoEngine {
255 inner: Arc::new(inner),
256 })
257 }
258}
259
260#[derive(Clone)]
262pub struct MitoEngine {
263 inner: Arc<EngineInner>,
264}
265
266impl MitoEngine {
267 #[allow(clippy::too_many_arguments)]
269 pub async fn new<S: LogStore>(
270 data_home: &str,
271 config: MitoConfig,
272 log_store: Arc<S>,
273 object_store_manager: ObjectStoreManagerRef,
274 schema_metadata_manager: SchemaMetadataManagerRef,
275 file_ref_manager: FileReferenceManagerRef,
276 partition_expr_fetcher: PartitionExprFetcherRef,
277 plugins: Plugins,
278 ) -> Result<MitoEngine> {
279 let builder = MitoEngineBuilder::new(
280 data_home,
281 config,
282 log_store,
283 object_store_manager,
284 schema_metadata_manager,
285 file_ref_manager,
286 partition_expr_fetcher,
287 plugins,
288 0, );
290 builder.try_build().await
291 }
292
293 pub fn mito_config(&self) -> &MitoConfig {
294 &self.inner.config
295 }
296
297 pub fn cache_manager(&self) -> CacheManagerRef {
298 self.inner.workers.cache_manager()
299 }
300
301 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
302 self.inner.workers.file_ref_manager()
303 }
304
305 pub fn gc_limiter(&self) -> GcLimiterRef {
306 self.inner.workers.gc_limiter()
307 }
308
309 pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
310 self.inner.workers.object_store_manager()
311 }
312
313 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
314 self.inner.workers.puffin_manager_factory()
315 }
316
317 pub fn intermediate_manager(&self) -> &IntermediateManager {
318 self.inner.workers.intermediate_manager()
319 }
320
321 pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
322 self.inner.workers.schema_metadata_manager()
323 }
324
325 pub async fn get_snapshot_of_file_refs(
327 &self,
328 file_handle_regions: impl IntoIterator<Item = RegionId>,
329 related_regions: HashMap<RegionId, HashSet<RegionId>>,
330 ) -> Result<FileRefsManifest> {
331 let file_ref_mgr = self.file_ref_manager();
332
333 let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
334 let query_regions: Vec<MitoRegionRef> = file_handle_regions
337 .into_iter()
338 .filter_map(|region_id| self.find_region(region_id))
339 .collect();
340
341 let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
342 let dst2src = related_regions
343 .into_iter()
344 .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
345 .fold(
346 HashMap::<RegionId, HashSet<RegionId>>::new(),
347 |mut acc, (k, v)| {
348 let entry = acc.entry(k).or_default();
349 entry.insert(v);
350 acc
351 },
352 );
353 let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
354 for (dst_region, srcs) in dst2src {
355 let Some(dst_region) = self.find_region(dst_region) else {
356 continue;
357 };
358 dst_region_to_src_regions.push((dst_region, srcs));
359 }
360 dst_region_to_src_regions
361 };
362
363 file_ref_mgr
364 .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
365 .await
366 }
367
368 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
370 self.inner.workers.is_region_exists(region_id)
371 }
372
373 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
375 self.inner.workers.is_region_opening(region_id)
376 }
377
378 pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
380 self.inner.workers.is_region_catching_up(region_id)
381 }
382
383 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
385 self.find_region(region_id)
386 .map(|region| region.region_statistic())
387 }
388
389 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
391 self.find_region(region_id)
392 .map(|r| r.primary_key_encoding())
393 }
394
395 #[tracing::instrument(skip_all)]
400 pub async fn scan_to_stream(
401 &self,
402 region_id: RegionId,
403 request: ScanRequest,
404 ) -> Result<SendableRecordBatchStream, BoxedError> {
405 self.scanner(region_id, request)
406 .await
407 .map_err(BoxedError::new)?
408 .scan()
409 .await
410 }
411
412 pub async fn scan_batch(
414 &self,
415 region_id: RegionId,
416 request: ScanRequest,
417 filter_deleted: bool,
418 ) -> Result<ScanBatchStream> {
419 let mut scan_region = self.scan_region(region_id, request)?;
420 scan_region.set_filter_deleted(filter_deleted);
421 scan_region.scanner().await?.scan_batch()
422 }
423
424 pub(crate) async fn scanner(
426 &self,
427 region_id: RegionId,
428 request: ScanRequest,
429 ) -> Result<Scanner> {
430 self.scan_region(region_id, request)?.scanner().await
431 }
432
433 #[tracing::instrument(skip_all, fields(region_id = %region_id))]
435 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
436 self.inner.scan_region(region_id, request)
437 }
438
439 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
444 let _timer = HANDLE_REQUEST_ELAPSED
445 .with_label_values(&["edit_region"])
446 .start_timer();
447
448 ensure!(
449 is_valid_region_edit(&edit),
450 InvalidRequestSnafu {
451 region_id,
452 reason: "invalid region edit"
453 }
454 );
455
456 let (tx, rx) = oneshot::channel();
457 let request = WorkerRequest::EditRegion(RegionEditRequest {
458 region_id,
459 edit,
460 tx,
461 });
462 self.inner
463 .workers
464 .submit_to_worker(region_id, request)
465 .await?;
466 rx.await.context(RecvSnafu)?
467 }
468
469 pub async fn copy_region_from(
473 &self,
474 region_id: RegionId,
475 request: MitoCopyRegionFromRequest,
476 ) -> Result<MitoCopyRegionFromResponse> {
477 self.inner.copy_region_from(region_id, request).await
478 }
479
480 #[cfg(test)]
481 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
482 self.find_region(id)
483 }
484
485 pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
486 self.inner.workers.get_region(region_id)
487 }
488
489 pub fn regions(&self) -> Vec<MitoRegionRef> {
491 self.inner.workers.all_regions().collect()
492 }
493
494 fn encode_manifest_info_to_extensions(
495 region_id: &RegionId,
496 manifest_info: RegionManifestInfo,
497 extensions: &mut HashMap<String, Vec<u8>>,
498 ) -> Result<()> {
499 let region_manifest_info = vec![(*region_id, manifest_info)];
500
501 extensions.insert(
502 MANIFEST_INFO_EXTENSION_KEY.to_string(),
503 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
504 );
505 info!(
506 "Added manifest info: {:?} to extensions, region_id: {:?}",
507 region_manifest_info, region_id
508 );
509 Ok(())
510 }
511
512 fn encode_column_metadatas_to_extensions(
513 region_id: &RegionId,
514 column_metadatas: Vec<ColumnMetadata>,
515 extensions: &mut HashMap<String, Vec<u8>>,
516 ) -> Result<()> {
517 extensions.insert(
518 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
519 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
520 );
521 info!(
522 "Added column metadatas: {:?} to extensions, region_id: {:?}",
523 column_metadatas, region_id
524 );
525 Ok(())
526 }
527
528 pub fn find_memtable_and_sst_stats(
531 &self,
532 region_id: RegionId,
533 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
534 let region = self
535 .find_region(region_id)
536 .context(RegionNotFoundSnafu { region_id })?;
537
538 let version = region.version();
539 let memtable_stats = version
540 .memtables
541 .list_memtables()
542 .iter()
543 .map(|x| x.stats())
544 .collect::<Vec<_>>();
545
546 let sst_stats = version
547 .ssts
548 .levels()
549 .iter()
550 .flat_map(|level| level.files().map(|x| x.meta_ref()))
551 .cloned()
552 .collect::<Vec<_>>();
553 Ok((memtable_stats, sst_stats))
554 }
555
556 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
558 let node_id = self.inner.workers.file_ref_manager().node_id();
559 let regions = self.inner.workers.all_regions();
560
561 let mut results = Vec::new();
562 for region in regions {
563 let mut entries = region.manifest_sst_entries().await;
564 for e in &mut entries {
565 e.node_id = node_id;
566 }
567 results.extend(entries);
568 }
569
570 results
571 }
572
573 pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
575 let node_id = self.inner.workers.file_ref_manager().node_id();
576 let cache_manager = self.inner.workers.cache_manager();
577 let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
578 let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
579 let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
580
581 let mut results = Vec::new();
582
583 for region in self.inner.workers.all_regions() {
584 let manifest_entries = region.manifest_sst_entries().await;
585 let access_layer = region.access_layer.clone();
586 let table_dir = access_layer.table_dir().to_string();
587 let path_type = access_layer.path_type();
588 let object_store = access_layer.object_store().clone();
589 let puffin_factory = access_layer.puffin_manager_factory().clone();
590 let path_factory = RegionFilePathFactory::new(table_dir, path_type);
591
592 let entry_futures = manifest_entries.into_iter().map(|entry| {
593 let object_store = object_store.clone();
594 let path_factory = path_factory.clone();
595 let puffin_factory = puffin_factory.clone();
596 let puffin_metadata_cache = puffin_metadata_cache.clone();
597 let bloom_filter_cache = bloom_filter_cache.clone();
598 let inverted_index_cache = inverted_index_cache.clone();
599
600 async move {
601 let Some(index_file_path) = entry.index_file_path.as_ref() else {
602 return Vec::new();
603 };
604
605 let index_version = entry.index_version;
606 let file_id = match FileId::parse_str(&entry.file_id) {
607 Ok(file_id) => file_id,
608 Err(err) => {
609 warn!(
610 err;
611 "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
612 entry.table_dir,
613 entry.file_id
614 );
615 return Vec::new();
616 }
617 };
618 let region_index_id = RegionIndexId::new(
619 RegionFileId::new(entry.region_id, file_id),
620 index_version,
621 );
622 let context = IndexEntryContext {
623 table_dir: &entry.table_dir,
624 index_file_path: index_file_path.as_str(),
625 region_id: entry.region_id,
626 table_id: entry.table_id,
627 region_number: entry.region_number,
628 region_group: entry.region_group,
629 region_sequence: entry.region_sequence,
630 file_id: &entry.file_id,
631 index_file_size: entry.index_file_size,
632 node_id,
633 };
634
635 let manager = puffin_factory
636 .build(object_store, path_factory)
637 .with_puffin_metadata_cache(puffin_metadata_cache);
638
639 collect_index_entries_from_puffin(
640 manager,
641 region_index_id,
642 context,
643 bloom_filter_cache,
644 inverted_index_cache,
645 )
646 .await
647 }
648 });
649
650 let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); while let Some(mut metas) = meta_stream.next().await {
652 results.append(&mut metas);
653 }
654 }
655
656 results
657 }
658
659 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
661 let node_id = self.inner.workers.file_ref_manager().node_id();
662 let regions = self.inner.workers.all_regions();
663
664 let mut layers_distinct_table_dirs = HashMap::new();
665 for region in regions {
666 let table_dir = region.access_layer.table_dir();
667 if !layers_distinct_table_dirs.contains_key(table_dir) {
668 layers_distinct_table_dirs
669 .insert(table_dir.to_string(), region.access_layer.clone());
670 }
671 }
672
673 stream::iter(layers_distinct_table_dirs)
674 .map(|(_, access_layer)| access_layer.storage_sst_entries())
675 .flatten()
676 .map(move |entry| {
677 entry.map(move |mut entry| {
678 entry.node_id = node_id;
679 entry
680 })
681 })
682 }
683}
684
685fn is_valid_region_edit(edit: &RegionEdit) -> bool {
689 (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
690 && matches!(
691 edit,
692 RegionEdit {
693 files_to_add: _,
694 files_to_remove: _,
695 timestamp_ms: _,
696 compaction_time_window: None,
697 flushed_entry_id: None,
698 flushed_sequence: None,
699 ..
700 }
701 )
702}
703
704struct EngineInner {
706 workers: WorkerGroup,
708 config: Arc<MitoConfig>,
710 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
712 scan_memory_tracker: QueryMemoryTracker,
714 #[cfg(feature = "enterprise")]
715 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
716}
717
718type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
719
720fn prepare_batch_open_requests(
722 requests: Vec<(RegionId, RegionOpenRequest)>,
723) -> Result<(
724 TopicGroupedRegionOpenRequests,
725 Vec<(RegionId, RegionOpenRequest)>,
726)> {
727 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
728 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
729 for (region_id, request) in requests {
730 match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
731 WalOptions::Kafka(options) => {
732 topic_to_regions
733 .entry(options.topic)
734 .or_default()
735 .push((region_id, request));
736 }
737 WalOptions::RaftEngine | WalOptions::Noop => {
738 remaining_regions.push((region_id, request));
739 }
740 }
741 }
742
743 Ok((topic_to_regions, remaining_regions))
744}
745
746impl EngineInner {
747 #[cfg(feature = "enterprise")]
748 #[must_use]
749 fn with_extension_range_provider_factory(
750 self,
751 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
752 ) -> Self {
753 Self {
754 extension_range_provider_factory,
755 ..self
756 }
757 }
758
759 async fn stop(&self) -> Result<()> {
761 self.workers.stop().await
762 }
763
764 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
765 self.workers
766 .get_region(region_id)
767 .context(RegionNotFoundSnafu { region_id })
768 }
769
770 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
774 let region = self.find_region(region_id)?;
776 Ok(region.metadata())
777 }
778
779 async fn open_topic_regions(
780 &self,
781 topic: String,
782 region_requests: Vec<(RegionId, RegionOpenRequest)>,
783 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
784 let now = Instant::now();
785 let region_ids = region_requests
786 .iter()
787 .map(|(region_id, _)| *region_id)
788 .collect::<Vec<_>>();
789 let provider = Provider::kafka_provider(topic);
790 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
791 provider.clone(),
792 self.wal_raw_entry_reader.clone(),
793 ®ion_ids,
794 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
795 );
796
797 let mut responses = Vec::with_capacity(region_requests.len());
798 for ((region_id, request), entry_receiver) in
799 region_requests.into_iter().zip(entry_receivers)
800 {
801 let (request, receiver) =
802 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
803 self.workers.submit_to_worker(region_id, request).await?;
804 responses.push(async move { receiver.await.context(RecvSnafu)? });
805 }
806
807 let distribution =
809 common_runtime::spawn_global(async move { distributor.distribute().await });
810 let responses = join_all(responses).await;
812 distribution.await.context(JoinSnafu)??;
813
814 let num_failure = responses.iter().filter(|r| r.is_err()).count();
815 info!(
816 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
817 region_ids.len() - num_failure,
818 provider.as_kafka_provider().unwrap(),
820 num_failure,
821 now.elapsed(),
822 );
823 Ok(region_ids.into_iter().zip(responses).collect())
824 }
825
826 async fn handle_batch_open_requests(
827 &self,
828 parallelism: usize,
829 requests: Vec<(RegionId, RegionOpenRequest)>,
830 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
831 let semaphore = Arc::new(Semaphore::new(parallelism));
832 let (topic_to_region_requests, remaining_region_requests) =
833 prepare_batch_open_requests(requests)?;
834 let mut responses =
835 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
836
837 if !topic_to_region_requests.is_empty() {
838 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
839 for (topic, region_requests) in topic_to_region_requests {
840 let semaphore_moved = semaphore.clone();
841 tasks.push(async move {
842 let _permit = semaphore_moved.acquire().await.unwrap();
844 self.open_topic_regions(topic, region_requests).await
845 })
846 }
847 let r = try_join_all(tasks).await?;
848 responses.extend(r.into_iter().flatten());
849 }
850
851 if !remaining_region_requests.is_empty() {
852 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
853 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
854 for (region_id, request) in remaining_region_requests {
855 let semaphore_moved = semaphore.clone();
856 region_ids.push(region_id);
857 tasks.push(async move {
858 let _permit = semaphore_moved.acquire().await.unwrap();
860 let (request, receiver) =
861 WorkerRequest::new_open_region_request(region_id, request, None);
862
863 self.workers.submit_to_worker(region_id, request).await?;
864
865 receiver.await.context(RecvSnafu)?
866 })
867 }
868
869 let results = join_all(tasks).await;
870 responses.extend(region_ids.into_iter().zip(results));
871 }
872
873 Ok(responses)
874 }
875
876 async fn catchup_topic_regions(
877 &self,
878 provider: Provider,
879 region_requests: Vec<(RegionId, RegionCatchupRequest)>,
880 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
881 let now = Instant::now();
882 let region_ids = region_requests
883 .iter()
884 .map(|(region_id, _)| *region_id)
885 .collect::<Vec<_>>();
886 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
887 provider.clone(),
888 self.wal_raw_entry_reader.clone(),
889 ®ion_ids,
890 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
891 );
892
893 let mut responses = Vec::with_capacity(region_requests.len());
894 for ((region_id, request), entry_receiver) in
895 region_requests.into_iter().zip(entry_receivers)
896 {
897 let (request, receiver) =
898 WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
899 self.workers.submit_to_worker(region_id, request).await?;
900 responses.push(async move { receiver.await.context(RecvSnafu)? });
901 }
902
903 let distribution =
905 common_runtime::spawn_global(async move { distributor.distribute().await });
906 let responses = join_all(responses).await;
908 distribution.await.context(JoinSnafu)??;
909
910 let num_failure = responses.iter().filter(|r| r.is_err()).count();
911 info!(
912 "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
913 region_ids.len() - num_failure,
914 provider.as_kafka_provider().unwrap(),
916 num_failure,
917 now.elapsed(),
918 );
919
920 Ok(region_ids.into_iter().zip(responses).collect())
921 }
922
923 async fn handle_batch_catchup_requests(
924 &self,
925 parallelism: usize,
926 requests: Vec<(RegionId, RegionCatchupRequest)>,
927 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
928 let mut responses = Vec::with_capacity(requests.len());
929 let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
930 let mut remaining_region_requests = vec![];
931
932 for (region_id, request) in requests {
933 match self.workers.get_region(region_id) {
934 Some(region) => match region.provider.as_kafka_provider() {
935 Some(provider) => {
936 topic_regions
937 .entry(provider.clone())
938 .or_default()
939 .push((region_id, request));
940 }
941 None => {
942 remaining_region_requests.push((region_id, request));
943 }
944 },
945 None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
946 }
947 }
948
949 let semaphore = Arc::new(Semaphore::new(parallelism));
950
951 if !topic_regions.is_empty() {
952 let mut tasks = Vec::with_capacity(topic_regions.len());
953 for (provider, region_requests) in topic_regions {
954 let semaphore_moved = semaphore.clone();
955 tasks.push(async move {
956 let _permit = semaphore_moved.acquire().await.unwrap();
958 self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
959 .await
960 })
961 }
962
963 let r = try_join_all(tasks).await?;
964 responses.extend(r.into_iter().flatten());
965 }
966
967 if !remaining_region_requests.is_empty() {
968 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
969 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
970 for (region_id, request) in remaining_region_requests {
971 let semaphore_moved = semaphore.clone();
972 region_ids.push(region_id);
973 tasks.push(async move {
974 let _permit = semaphore_moved.acquire().await.unwrap();
976 let (request, receiver) =
977 WorkerRequest::new_catchup_region_request(region_id, request, None);
978
979 self.workers.submit_to_worker(region_id, request).await?;
980
981 receiver.await.context(RecvSnafu)?
982 })
983 }
984
985 let results = join_all(tasks).await;
986 responses.extend(region_ids.into_iter().zip(results));
987 }
988
989 Ok(responses)
990 }
991
992 async fn handle_request(
994 &self,
995 region_id: RegionId,
996 request: RegionRequest,
997 ) -> Result<AffectedRows> {
998 let region_metadata = self.get_metadata(region_id).ok();
999 let (request, receiver) =
1000 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
1001 self.workers.submit_to_worker(region_id, request).await?;
1002
1003 receiver.await.context(RecvSnafu)?
1004 }
1005
1006 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
1008 self.find_region(region_id)
1010 .map(|r| r.find_committed_sequence())
1011 }
1012
1013 #[tracing::instrument(skip_all, fields(region_id = %region_id))]
1015 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
1016 let query_start = Instant::now();
1017 let region = self.find_region(region_id)?;
1019 let version = region.version();
1020 let cache_manager = self.workers.cache_manager();
1022
1023 let scan_region = ScanRegion::new(
1024 version,
1025 region.access_layer.clone(),
1026 request,
1027 CacheStrategy::EnableAll(cache_manager),
1028 )
1029 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
1030 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
1031 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
1032 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
1033 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
1034 .with_start_time(query_start);
1035
1036 #[cfg(feature = "enterprise")]
1037 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
1038
1039 Ok(scan_region)
1040 }
1041
1042 #[cfg(feature = "enterprise")]
1043 fn maybe_fill_extension_range_provider(
1044 &self,
1045 mut scan_region: ScanRegion,
1046 region: MitoRegionRef,
1047 ) -> ScanRegion {
1048 if region.is_follower()
1049 && let Some(factory) = self.extension_range_provider_factory.as_ref()
1050 {
1051 scan_region
1052 .set_extension_range_provider(factory.create_extension_range_provider(region));
1053 }
1054 scan_region
1055 }
1056
1057 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1059 let region = self.find_region(region_id)?;
1060 region.set_role(role);
1061 Ok(())
1062 }
1063
1064 async fn set_region_role_state_gracefully(
1066 &self,
1067 region_id: RegionId,
1068 region_role_state: SettableRegionRoleState,
1069 ) -> Result<SetRegionRoleStateResponse> {
1070 let (request, receiver) =
1073 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1074 self.workers.submit_to_worker(region_id, request).await?;
1075
1076 receiver.await.context(RecvSnafu)
1077 }
1078
1079 async fn sync_region(
1080 &self,
1081 region_id: RegionId,
1082 manifest_info: RegionManifestInfo,
1083 ) -> Result<(ManifestVersion, bool)> {
1084 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1085 let manifest_version = manifest_info.data_manifest_version();
1086 let (request, receiver) =
1087 WorkerRequest::new_sync_region_request(region_id, manifest_version);
1088 self.workers.submit_to_worker(region_id, request).await?;
1089
1090 receiver.await.context(RecvSnafu)?
1091 }
1092
1093 async fn remap_manifests(
1094 &self,
1095 request: RemapManifestsRequest,
1096 ) -> Result<RemapManifestsResponse> {
1097 let region_id = request.region_id;
1098 let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1099 self.workers.submit_to_worker(region_id, request).await?;
1100 let manifest_paths = receiver.await.context(RecvSnafu)??;
1101 Ok(RemapManifestsResponse { manifest_paths })
1102 }
1103
1104 async fn copy_region_from(
1105 &self,
1106 region_id: RegionId,
1107 request: MitoCopyRegionFromRequest,
1108 ) -> Result<MitoCopyRegionFromResponse> {
1109 let (request, receiver) =
1110 WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
1111 self.workers.submit_to_worker(region_id, request).await?;
1112 let response = receiver.await.context(RecvSnafu)??;
1113 Ok(response)
1114 }
1115
1116 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1117 self.workers.get_region(region_id).map(|region| {
1118 if region.is_follower() {
1119 RegionRole::Follower
1120 } else {
1121 RegionRole::Leader
1122 }
1123 })
1124 }
1125}
1126
1127fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
1128 responses
1129 .into_iter()
1130 .map(|(region_id, response)| {
1131 (
1132 region_id,
1133 response.map(RegionResponse::new).map_err(BoxedError::new),
1134 )
1135 })
1136 .collect()
1137}
1138
1139#[async_trait]
1140impl RegionEngine for MitoEngine {
1141 fn name(&self) -> &str {
1142 MITO_ENGINE_NAME
1143 }
1144
1145 #[tracing::instrument(skip_all)]
1146 async fn handle_batch_open_requests(
1147 &self,
1148 parallelism: usize,
1149 requests: Vec<(RegionId, RegionOpenRequest)>,
1150 ) -> Result<BatchResponses, BoxedError> {
1151 self.inner
1153 .handle_batch_open_requests(parallelism, requests)
1154 .await
1155 .map(map_batch_responses)
1156 .map_err(BoxedError::new)
1157 }
1158
1159 #[tracing::instrument(skip_all)]
1160 async fn handle_batch_catchup_requests(
1161 &self,
1162 parallelism: usize,
1163 requests: Vec<(RegionId, RegionCatchupRequest)>,
1164 ) -> Result<BatchResponses, BoxedError> {
1165 self.inner
1166 .handle_batch_catchup_requests(parallelism, requests)
1167 .await
1168 .map(map_batch_responses)
1169 .map_err(BoxedError::new)
1170 }
1171
1172 #[tracing::instrument(skip_all)]
1173 async fn handle_request(
1174 &self,
1175 region_id: RegionId,
1176 request: RegionRequest,
1177 ) -> Result<RegionResponse, BoxedError> {
1178 let _timer = HANDLE_REQUEST_ELAPSED
1179 .with_label_values(&[request.request_type()])
1180 .start_timer();
1181
1182 let is_alter = matches!(request, RegionRequest::Alter(_));
1183 let is_create = matches!(request, RegionRequest::Create(_));
1184 let mut response = self
1185 .inner
1186 .handle_request(region_id, request)
1187 .await
1188 .map(RegionResponse::new)
1189 .map_err(BoxedError::new)?;
1190
1191 if is_alter {
1192 self.handle_alter_response(region_id, &mut response)
1193 .map_err(BoxedError::new)?;
1194 } else if is_create {
1195 self.handle_create_response(region_id, &mut response)
1196 .map_err(BoxedError::new)?;
1197 }
1198
1199 Ok(response)
1200 }
1201
1202 #[tracing::instrument(skip_all)]
1203 async fn handle_query(
1204 &self,
1205 region_id: RegionId,
1206 request: ScanRequest,
1207 ) -> Result<RegionScannerRef, BoxedError> {
1208 self.scan_region(region_id, request)
1209 .map_err(BoxedError::new)?
1210 .region_scanner()
1211 .await
1212 .map_err(BoxedError::new)
1213 }
1214
1215 fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
1216 Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
1217 }
1218
1219 async fn get_committed_sequence(
1220 &self,
1221 region_id: RegionId,
1222 ) -> Result<SequenceNumber, BoxedError> {
1223 self.inner
1224 .get_committed_sequence(region_id)
1225 .map_err(BoxedError::new)
1226 }
1227
1228 async fn get_metadata(
1230 &self,
1231 region_id: RegionId,
1232 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1233 self.inner.get_metadata(region_id).map_err(BoxedError::new)
1234 }
1235
1236 async fn stop(&self) -> std::result::Result<(), BoxedError> {
1242 self.inner.stop().await.map_err(BoxedError::new)
1243 }
1244
1245 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1246 self.get_region_statistic(region_id)
1247 }
1248
1249 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1250 self.inner
1251 .set_region_role(region_id, role)
1252 .map_err(BoxedError::new)
1253 }
1254
1255 async fn set_region_role_state_gracefully(
1256 &self,
1257 region_id: RegionId,
1258 region_role_state: SettableRegionRoleState,
1259 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1260 let _timer = HANDLE_REQUEST_ELAPSED
1261 .with_label_values(&["set_region_role_state_gracefully"])
1262 .start_timer();
1263
1264 self.inner
1265 .set_region_role_state_gracefully(region_id, region_role_state)
1266 .await
1267 .map_err(BoxedError::new)
1268 }
1269
1270 async fn sync_region(
1271 &self,
1272 region_id: RegionId,
1273 request: SyncRegionFromRequest,
1274 ) -> Result<SyncRegionFromResponse, BoxedError> {
1275 let manifest_info = request
1276 .into_region_manifest_info()
1277 .context(UnexpectedSnafu {
1278 err_msg: "Expected a manifest info request",
1279 })
1280 .map_err(BoxedError::new)?;
1281 let (_, synced) = self
1282 .inner
1283 .sync_region(region_id, manifest_info)
1284 .await
1285 .map_err(BoxedError::new)?;
1286
1287 Ok(SyncRegionFromResponse::Mito { synced })
1288 }
1289
1290 async fn remap_manifests(
1291 &self,
1292 request: RemapManifestsRequest,
1293 ) -> Result<RemapManifestsResponse, BoxedError> {
1294 self.inner
1295 .remap_manifests(request)
1296 .await
1297 .map_err(BoxedError::new)
1298 }
1299
1300 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1301 self.inner.role(region_id)
1302 }
1303
1304 fn as_any(&self) -> &dyn Any {
1305 self
1306 }
1307}
1308
1309impl MitoEngine {
1310 fn handle_alter_response(
1311 &self,
1312 region_id: RegionId,
1313 response: &mut RegionResponse,
1314 ) -> Result<()> {
1315 if let Some(statistic) = self.region_statistic(region_id) {
1316 Self::encode_manifest_info_to_extensions(
1317 ®ion_id,
1318 statistic.manifest,
1319 &mut response.extensions,
1320 )?;
1321 }
1322 let column_metadatas = self
1323 .inner
1324 .find_region(region_id)
1325 .ok()
1326 .map(|r| r.metadata().column_metadatas.clone());
1327 if let Some(column_metadatas) = column_metadatas {
1328 Self::encode_column_metadatas_to_extensions(
1329 ®ion_id,
1330 column_metadatas,
1331 &mut response.extensions,
1332 )?;
1333 }
1334 Ok(())
1335 }
1336
1337 fn handle_create_response(
1338 &self,
1339 region_id: RegionId,
1340 response: &mut RegionResponse,
1341 ) -> Result<()> {
1342 let column_metadatas = self
1343 .inner
1344 .find_region(region_id)
1345 .ok()
1346 .map(|r| r.metadata().column_metadatas.clone());
1347 if let Some(column_metadatas) = column_metadatas {
1348 Self::encode_column_metadatas_to_extensions(
1349 ®ion_id,
1350 column_metadatas,
1351 &mut response.extensions,
1352 )?;
1353 }
1354 Ok(())
1355 }
1356}
1357
1358#[cfg(any(test, feature = "test"))]
1360#[allow(clippy::too_many_arguments)]
1361impl MitoEngine {
1362 pub async fn new_for_test<S: LogStore>(
1364 data_home: &str,
1365 mut config: MitoConfig,
1366 log_store: Arc<S>,
1367 object_store_manager: ObjectStoreManagerRef,
1368 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1369 listener: Option<crate::engine::listener::EventListenerRef>,
1370 time_provider: crate::time_provider::TimeProviderRef,
1371 schema_metadata_manager: SchemaMetadataManagerRef,
1372 file_ref_manager: FileReferenceManagerRef,
1373 partition_expr_fetcher: PartitionExprFetcherRef,
1374 ) -> Result<MitoEngine> {
1375 config.sanitize(data_home)?;
1376
1377 let config = Arc::new(config);
1378 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1379 let total_memory = get_total_memory_bytes().max(0) as u64;
1380 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1381 let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
1382 .with_on_update(|usage| {
1383 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1384 })
1385 .with_on_reject(|| {
1386 SCAN_REQUESTS_REJECTED_TOTAL.inc();
1387 });
1388 Ok(MitoEngine {
1389 inner: Arc::new(EngineInner {
1390 workers: WorkerGroup::start_for_test(
1391 config.clone(),
1392 log_store,
1393 object_store_manager,
1394 write_buffer_manager,
1395 listener,
1396 schema_metadata_manager,
1397 file_ref_manager,
1398 time_provider,
1399 partition_expr_fetcher,
1400 )
1401 .await?,
1402 config,
1403 wal_raw_entry_reader,
1404 scan_memory_tracker,
1405 #[cfg(feature = "enterprise")]
1406 extension_range_provider_factory: None,
1407 }),
1408 })
1409 }
1410
1411 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1413 self.inner.workers.purge_scheduler()
1414 }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419 use std::time::Duration;
1420
1421 use super::*;
1422 use crate::sst::file::FileMeta;
1423
1424 #[test]
1425 fn test_is_valid_region_edit() {
1426 let edit = RegionEdit {
1428 files_to_add: vec![FileMeta::default()],
1429 files_to_remove: vec![],
1430 timestamp_ms: None,
1431 compaction_time_window: None,
1432 flushed_entry_id: None,
1433 flushed_sequence: None,
1434 committed_sequence: None,
1435 };
1436 assert!(is_valid_region_edit(&edit));
1437
1438 let edit = RegionEdit {
1440 files_to_add: vec![],
1441 files_to_remove: vec![],
1442 timestamp_ms: None,
1443 compaction_time_window: None,
1444 flushed_entry_id: None,
1445 flushed_sequence: None,
1446 committed_sequence: None,
1447 };
1448 assert!(!is_valid_region_edit(&edit));
1449
1450 let edit = RegionEdit {
1452 files_to_add: vec![FileMeta::default()],
1453 files_to_remove: vec![FileMeta::default()],
1454 timestamp_ms: None,
1455 compaction_time_window: None,
1456 flushed_entry_id: None,
1457 flushed_sequence: None,
1458 committed_sequence: None,
1459 };
1460 assert!(is_valid_region_edit(&edit));
1461
1462 let edit = RegionEdit {
1464 files_to_add: vec![FileMeta::default()],
1465 files_to_remove: vec![],
1466 timestamp_ms: None,
1467 compaction_time_window: Some(Duration::from_secs(1)),
1468 flushed_entry_id: None,
1469 flushed_sequence: None,
1470 committed_sequence: None,
1471 };
1472 assert!(!is_valid_region_edit(&edit));
1473 let edit = RegionEdit {
1474 files_to_add: vec![FileMeta::default()],
1475 files_to_remove: vec![],
1476 timestamp_ms: None,
1477 compaction_time_window: None,
1478 flushed_entry_id: Some(1),
1479 flushed_sequence: None,
1480 committed_sequence: None,
1481 };
1482 assert!(!is_valid_region_edit(&edit));
1483 let edit = RegionEdit {
1484 files_to_add: vec![FileMeta::default()],
1485 files_to_remove: vec![],
1486 timestamp_ms: None,
1487 compaction_time_window: None,
1488 flushed_entry_id: None,
1489 flushed_sequence: Some(1),
1490 committed_sequence: None,
1491 };
1492 assert!(!is_valid_region_edit(&edit));
1493 }
1494}