1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59pub mod region_hook;
60#[cfg(test)]
61mod row_selector_test;
62#[cfg(test)]
63mod scan_corrupt;
64#[cfg(test)]
65mod scan_test;
66#[cfg(test)]
67mod set_role_state_test;
68#[cfg(test)]
69mod skip_wal_test;
70#[cfg(test)]
71mod staging_test;
72#[cfg(test)]
73mod sync_test;
74#[cfg(test)]
75mod truncate_test;
76
77#[cfg(test)]
78mod copy_region_from_test;
79#[cfg(test)]
80mod remap_manifests_test;
81
82#[cfg(test)]
83mod apply_staging_manifest_test;
84#[cfg(test)]
85mod partition_filter_test;
86mod puffin_index;
87
88use std::any::Any;
89use std::collections::{HashMap, HashSet};
90use std::sync::Arc;
91use std::time::Instant;
92
93use api::region::RegionResponse;
94use async_trait::async_trait;
95use common_base::Plugins;
96use common_error::ext::BoxedError;
97use common_meta::error::UnexpectedSnafu;
98use common_meta::key::SchemaMetadataManagerRef;
99use common_recordbatch::{QueryMemoryTracker, SendableRecordBatchStream};
100use common_stat::get_total_memory_bytes;
101use common_telemetry::{info, tracing, warn};
102use common_wal::options::WalOptions;
103use futures::future::{join_all, try_join_all};
104use futures::stream::{self, Stream, StreamExt};
105use object_store::manager::ObjectStoreManagerRef;
106use snafu::{OptionExt, ResultExt, ensure};
107use store_api::ManifestVersion;
108use store_api::codec::PrimaryKeyEncoding;
109use store_api::logstore::LogStore;
110use store_api::logstore::provider::{KafkaProvider, Provider};
111use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
112use store_api::metric_engine_consts::{
113 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
114};
115use store_api::region_engine::{
116 BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
117 RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
118 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
119 SyncRegionFromRequest, SyncRegionFromResponse,
120};
121use store_api::region_info::RegionInfoEntry;
122use store_api::region_request::{
123 AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
124};
125use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
126use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
127use tokio::sync::{Semaphore, oneshot};
128
129use crate::access_layer::RegionFilePathFactory;
130use crate::cache::{CacheManagerRef, CacheStrategy};
131use crate::config::MitoConfig;
132use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
133use crate::error::{
134 IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu,
135 RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu,
136 SnapshotFenceStaleSnafu,
137};
138#[cfg(feature = "enterprise")]
139use crate::extension::BoxedExtensionRangeProviderFactory;
140use crate::gc::GcLimiterRef;
141use crate::manifest::action::RegionEdit;
142use crate::memtable::MemtableStats;
143use crate::metrics::{
144 HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_EXHAUSTED_TOTAL, SCAN_MEMORY_USAGE_BYTES,
145 SCAN_REQUESTS_REJECTED_TOTAL,
146};
147use crate::read::scan_region::{ScanRegion, Scanner};
148use crate::read::stream::ScanBatchStream;
149use crate::region::MitoRegionRef;
150use crate::region::opener::PartitionExprFetcherRef;
151use crate::region::options::parse_wal_options;
152use crate::request::{RegionEditRequest, WorkerRequest};
153use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
154use crate::sst::file_ref::FileReferenceManagerRef;
155use crate::sst::index::intermediate::IntermediateManager;
156use crate::sst::index::puffin_manager::PuffinManagerFactory;
157use crate::wal::entry_distributor::{
158 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
159};
160use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
161use crate::worker::WorkerGroup;
162
163pub const MITO_ENGINE_NAME: &str = "mito";
164
165pub struct MitoEngineBuilder<'a, S: LogStore> {
166 data_home: &'a str,
167 config: MitoConfig,
168 log_store: Arc<S>,
169 object_store_manager: ObjectStoreManagerRef,
170 schema_metadata_manager: SchemaMetadataManagerRef,
171 file_ref_manager: FileReferenceManagerRef,
172 partition_expr_fetcher: PartitionExprFetcherRef,
173 plugins: Plugins,
174 #[cfg(feature = "enterprise")]
175 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
176}
177
178impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
179 #[allow(clippy::too_many_arguments)]
180 pub fn new(
181 data_home: &'a str,
182 config: MitoConfig,
183 log_store: Arc<S>,
184 object_store_manager: ObjectStoreManagerRef,
185 schema_metadata_manager: SchemaMetadataManagerRef,
186 file_ref_manager: FileReferenceManagerRef,
187 partition_expr_fetcher: PartitionExprFetcherRef,
188 plugins: Plugins,
189 ) -> Self {
190 Self {
191 data_home,
192 config,
193 log_store,
194 object_store_manager,
195 schema_metadata_manager,
196 file_ref_manager,
197 plugins,
198 partition_expr_fetcher,
199 #[cfg(feature = "enterprise")]
200 extension_range_provider_factory: None,
201 }
202 }
203
204 #[cfg(feature = "enterprise")]
205 #[must_use]
206 pub fn with_extension_range_provider_factory(
207 self,
208 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
209 ) -> Self {
210 Self {
211 extension_range_provider_factory,
212 ..self
213 }
214 }
215
216 pub async fn try_build(mut self) -> Result<MitoEngine> {
217 self.config.sanitize(self.data_home)?;
218
219 let config = Arc::new(self.config);
220 let workers = WorkerGroup::start(
221 config.clone(),
222 self.log_store.clone(),
223 self.object_store_manager,
224 self.schema_metadata_manager,
225 self.file_ref_manager,
226 self.partition_expr_fetcher.clone(),
227 self.plugins,
228 )
229 .await?;
230 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
231 let total_memory = get_total_memory_bytes().max(0) as u64;
232 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
233 let scan_memory_tracker =
234 QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
235 .on_update(|usage| {
236 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
237 })
238 .on_exhausted(|| {
239 SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
240 })
241 .on_reject(|| {
242 SCAN_REQUESTS_REJECTED_TOTAL.inc();
243 })
244 .build();
245
246 let inner = EngineInner {
247 workers,
248 config,
249 wal_raw_entry_reader,
250 scan_memory_tracker,
251 #[cfg(feature = "enterprise")]
252 extension_range_provider_factory: None,
253 };
254
255 #[cfg(feature = "enterprise")]
256 let inner =
257 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
258
259 Ok(MitoEngine {
260 inner: Arc::new(inner),
261 })
262 }
263}
264
265#[derive(Clone)]
267pub struct MitoEngine {
268 inner: Arc<EngineInner>,
269}
270
271impl MitoEngine {
272 #[allow(clippy::too_many_arguments)]
274 pub async fn new<S: LogStore>(
275 data_home: &str,
276 config: MitoConfig,
277 log_store: Arc<S>,
278 object_store_manager: ObjectStoreManagerRef,
279 schema_metadata_manager: SchemaMetadataManagerRef,
280 file_ref_manager: FileReferenceManagerRef,
281 partition_expr_fetcher: PartitionExprFetcherRef,
282 plugins: Plugins,
283 ) -> Result<MitoEngine> {
284 let builder = MitoEngineBuilder::new(
285 data_home,
286 config,
287 log_store,
288 object_store_manager,
289 schema_metadata_manager,
290 file_ref_manager,
291 partition_expr_fetcher,
292 plugins,
293 );
294 builder.try_build().await
295 }
296
297 pub fn mito_config(&self) -> &MitoConfig {
298 &self.inner.config
299 }
300
301 pub fn cache_manager(&self) -> CacheManagerRef {
302 self.inner.workers.cache_manager()
303 }
304
305 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
306 self.inner.workers.file_ref_manager()
307 }
308
309 pub fn gc_limiter(&self) -> GcLimiterRef {
310 self.inner.workers.gc_limiter()
311 }
312
313 pub fn object_store_manager(&self) -> &ObjectStoreManagerRef {
314 self.inner.workers.object_store_manager()
315 }
316
317 pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
318 self.inner.workers.puffin_manager_factory()
319 }
320
321 pub fn intermediate_manager(&self) -> &IntermediateManager {
322 self.inner.workers.intermediate_manager()
323 }
324
325 pub fn schema_metadata_manager(&self) -> &SchemaMetadataManagerRef {
326 self.inner.workers.schema_metadata_manager()
327 }
328
329 pub async fn get_snapshot_of_file_refs(
331 &self,
332 file_handle_regions: impl IntoIterator<Item = RegionId>,
333 related_regions: HashMap<RegionId, HashSet<RegionId>>,
334 ) -> Result<FileRefsManifest> {
335 let file_ref_mgr = self.file_ref_manager();
336
337 let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
338 let query_regions: Vec<MitoRegionRef> = file_handle_regions
341 .into_iter()
342 .filter_map(|region_id| self.find_region(region_id))
343 .collect();
344
345 let dst_region_to_src_regions: Vec<(MitoRegionRef, HashSet<RegionId>)> = {
346 let dst2src = related_regions
347 .into_iter()
348 .flat_map(|(src, dsts)| dsts.into_iter().map(move |dst| (dst, src)))
349 .fold(
350 HashMap::<RegionId, HashSet<RegionId>>::new(),
351 |mut acc, (k, v)| {
352 let entry = acc.entry(k).or_default();
353 entry.insert(v);
354 acc
355 },
356 );
357 let mut dst_region_to_src_regions = Vec::with_capacity(dst2src.len());
358 for (dst_region, srcs) in dst2src {
359 let Some(dst_region) = self.find_region(dst_region) else {
360 continue;
361 };
362 dst_region_to_src_regions.push((dst_region, srcs));
363 }
364 dst_region_to_src_regions
365 };
366
367 file_ref_mgr
368 .get_snapshot_of_file_refs(query_regions, dst_region_to_src_regions)
369 .await
370 }
371
372 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
374 self.inner.workers.is_region_exists(region_id)
375 }
376
377 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
379 self.inner.workers.is_region_opening(region_id)
380 }
381
382 pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
384 self.inner.workers.is_region_catching_up(region_id)
385 }
386
387 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
389 self.find_region(region_id)
390 .map(|region| region.region_statistic())
391 }
392
393 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
395 self.find_region(region_id)
396 .map(|r| r.primary_key_encoding())
397 }
398
399 #[tracing::instrument(skip_all)]
404 pub async fn scan_to_stream(
405 &self,
406 region_id: RegionId,
407 request: ScanRequest,
408 ) -> Result<SendableRecordBatchStream, BoxedError> {
409 self.scanner(region_id, request)
410 .await
411 .map_err(BoxedError::new)?
412 .scan()
413 .await
414 }
415
416 pub async fn scan_batch(
418 &self,
419 region_id: RegionId,
420 request: ScanRequest,
421 filter_deleted: bool,
422 ) -> Result<ScanBatchStream> {
423 let mut scan_region = self.scan_region(region_id, request)?;
424 scan_region.set_filter_deleted(filter_deleted);
425 scan_region.scanner().await?.scan_batch()
426 }
427
428 pub(crate) async fn scanner(
430 &self,
431 region_id: RegionId,
432 request: ScanRequest,
433 ) -> Result<Scanner> {
434 self.scan_region(region_id, request)?.scanner().await
435 }
436
437 #[tracing::instrument(skip_all, fields(region_id = %region_id))]
439 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
440 self.inner.scan_region(region_id, request)
441 }
442
443 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
448 let _timer = HANDLE_REQUEST_ELAPSED
449 .with_label_values(&["edit_region"])
450 .start_timer();
451
452 ensure!(
453 is_valid_region_edit(&edit),
454 InvalidRequestSnafu {
455 region_id,
456 reason: "invalid region edit"
457 }
458 );
459
460 let (tx, rx) = oneshot::channel();
461 let request = WorkerRequest::EditRegion(RegionEditRequest::new(region_id, edit, true, tx));
462 self.inner
463 .workers
464 .submit_to_worker(region_id, request)
465 .await?;
466 rx.await.context(RecvSnafu)?
467 }
468
469 pub async fn copy_region_from(
473 &self,
474 region_id: RegionId,
475 request: MitoCopyRegionFromRequest,
476 ) -> Result<MitoCopyRegionFromResponse> {
477 self.inner.copy_region_from(region_id, request).await
478 }
479
480 #[cfg(test)]
481 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
482 self.find_region(id)
483 }
484
485 pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
486 self.inner.workers.get_region(region_id)
487 }
488
489 pub fn regions(&self) -> Vec<MitoRegionRef> {
491 self.inner.workers.all_regions().collect()
492 }
493
494 fn encode_manifest_info_to_extensions(
495 region_id: &RegionId,
496 manifest_info: RegionManifestInfo,
497 extensions: &mut HashMap<String, Vec<u8>>,
498 ) -> Result<()> {
499 let region_manifest_info = vec![(*region_id, manifest_info)];
500
501 extensions.insert(
502 MANIFEST_INFO_EXTENSION_KEY.to_string(),
503 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
504 );
505 info!(
506 "Added manifest info: {:?} to extensions, region_id: {:?}",
507 region_manifest_info, region_id
508 );
509 Ok(())
510 }
511
512 fn encode_column_metadatas_to_extensions(
513 region_id: &RegionId,
514 column_metadatas: Vec<ColumnMetadata>,
515 extensions: &mut HashMap<String, Vec<u8>>,
516 ) -> Result<()> {
517 extensions.insert(
518 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
519 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
520 );
521 info!(
522 "Added column metadatas: {:?} to extensions, region_id: {:?}",
523 column_metadatas, region_id
524 );
525 Ok(())
526 }
527
528 pub fn find_memtable_and_sst_stats(
531 &self,
532 region_id: RegionId,
533 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
534 let region = self
535 .find_region(region_id)
536 .context(RegionNotFoundSnafu { region_id })?;
537
538 let version = region.version();
539 let memtable_stats = version
540 .memtables
541 .list_memtables()
542 .iter()
543 .map(|x| x.stats())
544 .collect::<Vec<_>>();
545
546 let sst_stats = version
547 .ssts
548 .levels()
549 .iter()
550 .flat_map(|level| level.files().map(|x| x.meta_ref()))
551 .cloned()
552 .collect::<Vec<_>>();
553 Ok((memtable_stats, sst_stats))
554 }
555
556 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
558 let node_id = self.inner.workers.file_ref_manager().node_id();
559 let regions = self.inner.workers.all_regions();
560
561 let mut results = Vec::new();
562 for region in regions {
563 let mut entries = region.manifest_sst_entries().await;
564 for e in &mut entries {
565 e.node_id = node_id;
566 }
567 results.extend(entries);
568 }
569
570 results
571 }
572
573 pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
575 let node_id = self.inner.workers.file_ref_manager().node_id();
576 let cache_manager = self.inner.workers.cache_manager();
577 let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
578 let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
579 let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
580
581 let mut results = Vec::new();
582
583 for region in self.inner.workers.all_regions() {
584 let manifest_entries = region.manifest_sst_entries().await;
585 let access_layer = region.access_layer.clone();
586 let table_dir = access_layer.table_dir().to_string();
587 let path_type = access_layer.path_type();
588 let object_store = access_layer.object_store().clone();
589 let puffin_factory = access_layer.puffin_manager_factory().clone();
590 let path_factory = RegionFilePathFactory::new(table_dir, path_type);
591
592 let entry_futures = manifest_entries.into_iter().map(|entry| {
593 let object_store = object_store.clone();
594 let path_factory = path_factory.clone();
595 let puffin_factory = puffin_factory.clone();
596 let puffin_metadata_cache = puffin_metadata_cache.clone();
597 let bloom_filter_cache = bloom_filter_cache.clone();
598 let inverted_index_cache = inverted_index_cache.clone();
599
600 async move {
601 let Some(index_file_path) = entry.index_file_path.as_ref() else {
602 return Vec::new();
603 };
604
605 let index_version = entry.index_version;
606 let file_id = match FileId::parse_str(&entry.file_id) {
607 Ok(file_id) => file_id,
608 Err(err) => {
609 warn!(
610 err;
611 "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
612 entry.table_dir,
613 entry.file_id
614 );
615 return Vec::new();
616 }
617 };
618 let region_index_id = RegionIndexId::new(
621 RegionFileId::new(entry.origin_region_id, file_id),
622 index_version,
623 );
624 let context = IndexEntryContext {
625 table_dir: &entry.table_dir,
626 index_file_path: index_file_path.as_str(),
627 region_id: entry.region_id,
628 table_id: entry.table_id,
629 region_number: entry.region_number,
630 region_group: entry.region_group,
631 region_sequence: entry.region_sequence,
632 file_id: &entry.file_id,
633 index_file_size: entry.index_file_size,
634 node_id,
635 };
636
637 let manager = puffin_factory
638 .build(object_store, path_factory)
639 .with_puffin_metadata_cache(puffin_metadata_cache);
640
641 collect_index_entries_from_puffin(
642 manager,
643 region_index_id,
644 context,
645 bloom_filter_cache,
646 inverted_index_cache,
647 )
648 .await
649 }
650 });
651
652 let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); while let Some(mut metas) = meta_stream.next().await {
654 results.append(&mut metas);
655 }
656 }
657
658 results
659 }
660
661 pub async fn all_region_infos(&self) -> Vec<RegionInfoEntry> {
663 let node_id = self.inner.workers.file_ref_manager().node_id();
664 self.inner
665 .workers
666 .all_regions()
667 .map(|region| region.region_info_entry(node_id))
668 .collect()
669 }
670
671 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
673 let node_id = self.inner.workers.file_ref_manager().node_id();
674 let regions = self.inner.workers.all_regions();
675
676 let mut layers_distinct_table_dirs = HashMap::new();
677 for region in regions {
678 let table_dir = region.access_layer.table_dir();
679 if !layers_distinct_table_dirs.contains_key(table_dir) {
680 layers_distinct_table_dirs
681 .insert(table_dir.to_string(), region.access_layer.clone());
682 }
683 }
684
685 stream::iter(layers_distinct_table_dirs)
686 .map(|(_, access_layer)| access_layer.storage_sst_entries())
687 .flatten()
688 .map(move |entry| {
689 entry.map(move |mut entry| {
690 entry.node_id = node_id;
691 entry
692 })
693 })
694 }
695}
696
697fn is_valid_region_edit(edit: &RegionEdit) -> bool {
701 (!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
702 && matches!(
703 edit,
704 RegionEdit {
705 files_to_add: _,
706 files_to_remove: _,
707 timestamp_ms: _,
708 compaction_time_window: None,
709 flushed_entry_id: None,
710 flushed_sequence: None,
711 ..
712 }
713 )
714}
715
716struct EngineInner {
718 workers: WorkerGroup,
720 config: Arc<MitoConfig>,
722 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
724 scan_memory_tracker: QueryMemoryTracker,
726 #[cfg(feature = "enterprise")]
727 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
728}
729
730type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
731
732fn prepare_batch_open_requests(
734 requests: Vec<(RegionId, RegionOpenRequest)>,
735) -> Result<(
736 TopicGroupedRegionOpenRequests,
737 Vec<(RegionId, RegionOpenRequest)>,
738)> {
739 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
740 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
741 for (region_id, request) in requests {
742 match parse_wal_options(&request.options).context(SerdeJsonSnafu)? {
743 WalOptions::Kafka(options) => {
744 topic_to_regions
745 .entry(options.topic)
746 .or_default()
747 .push((region_id, request));
748 }
749 WalOptions::RaftEngine | WalOptions::Noop => {
750 remaining_regions.push((region_id, request));
751 }
752 }
753 }
754
755 Ok((topic_to_regions, remaining_regions))
756}
757
758impl EngineInner {
759 #[cfg(feature = "enterprise")]
760 #[must_use]
761 fn with_extension_range_provider_factory(
762 self,
763 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
764 ) -> Self {
765 Self {
766 extension_range_provider_factory,
767 ..self
768 }
769 }
770
771 async fn stop(&self) -> Result<()> {
773 self.workers.stop().await
774 }
775
776 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
777 self.workers
778 .get_region(region_id)
779 .context(RegionNotFoundSnafu { region_id })
780 }
781
782 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
786 let region = self.find_region(region_id)?;
788 Ok(region.metadata())
789 }
790
791 async fn open_topic_regions(
792 &self,
793 topic: String,
794 region_requests: Vec<(RegionId, RegionOpenRequest)>,
795 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
796 let now = Instant::now();
797 let region_ids = region_requests
798 .iter()
799 .map(|(region_id, _)| *region_id)
800 .collect::<Vec<_>>();
801 let provider = Provider::kafka_provider(topic);
802 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
803 provider.clone(),
804 self.wal_raw_entry_reader.clone(),
805 ®ion_ids,
806 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
807 );
808
809 let mut responses = Vec::with_capacity(region_requests.len());
810 for ((region_id, request), entry_receiver) in
811 region_requests.into_iter().zip(entry_receivers)
812 {
813 let (request, receiver) =
814 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
815 self.workers.submit_to_worker(region_id, request).await?;
816 responses.push(async move { receiver.await.context(RecvSnafu)? });
817 }
818
819 let distribution =
821 common_runtime::spawn_global(async move { distributor.distribute().await });
822 let responses = join_all(responses).await;
824 distribution.await.context(JoinSnafu)??;
825
826 let num_failure = responses.iter().filter(|r| r.is_err()).count();
827 info!(
828 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
829 region_ids.len() - num_failure,
830 provider.as_kafka_provider().unwrap(),
832 num_failure,
833 now.elapsed(),
834 );
835 Ok(region_ids.into_iter().zip(responses).collect())
836 }
837
838 async fn handle_batch_open_requests(
839 &self,
840 parallelism: usize,
841 requests: Vec<(RegionId, RegionOpenRequest)>,
842 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
843 let semaphore = Arc::new(Semaphore::new(parallelism));
844 let (topic_to_region_requests, remaining_region_requests) =
845 prepare_batch_open_requests(requests)?;
846 let mut responses =
847 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
848
849 if !topic_to_region_requests.is_empty() {
850 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
851 for (topic, region_requests) in topic_to_region_requests {
852 let semaphore_moved = semaphore.clone();
853 tasks.push(async move {
854 let _permit = semaphore_moved.acquire().await.unwrap();
856 self.open_topic_regions(topic, region_requests).await
857 })
858 }
859 let r = try_join_all(tasks).await?;
860 responses.extend(r.into_iter().flatten());
861 }
862
863 if !remaining_region_requests.is_empty() {
864 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
865 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
866 for (region_id, request) in remaining_region_requests {
867 let semaphore_moved = semaphore.clone();
868 region_ids.push(region_id);
869 tasks.push(async move {
870 let _permit = semaphore_moved.acquire().await.unwrap();
872 let (request, receiver) =
873 WorkerRequest::new_open_region_request(region_id, request, None);
874
875 self.workers.submit_to_worker(region_id, request).await?;
876
877 receiver.await.context(RecvSnafu)?
878 })
879 }
880
881 let results = join_all(tasks).await;
882 responses.extend(region_ids.into_iter().zip(results));
883 }
884
885 Ok(responses)
886 }
887
888 async fn catchup_topic_regions(
889 &self,
890 provider: Provider,
891 region_requests: Vec<(RegionId, RegionCatchupRequest)>,
892 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
893 let now = Instant::now();
894 let region_ids = region_requests
895 .iter()
896 .map(|(region_id, _)| *region_id)
897 .collect::<Vec<_>>();
898 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
899 provider.clone(),
900 self.wal_raw_entry_reader.clone(),
901 ®ion_ids,
902 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
903 );
904
905 let mut responses = Vec::with_capacity(region_requests.len());
906 for ((region_id, request), entry_receiver) in
907 region_requests.into_iter().zip(entry_receivers)
908 {
909 let (request, receiver) =
910 WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
911 self.workers.submit_to_worker(region_id, request).await?;
912 responses.push(async move { receiver.await.context(RecvSnafu)? });
913 }
914
915 let distribution =
917 common_runtime::spawn_global(async move { distributor.distribute().await });
918 let responses = join_all(responses).await;
920 distribution.await.context(JoinSnafu)??;
921
922 let num_failure = responses.iter().filter(|r| r.is_err()).count();
923 info!(
924 "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
925 region_ids.len() - num_failure,
926 provider.as_kafka_provider().unwrap(),
928 num_failure,
929 now.elapsed(),
930 );
931
932 Ok(region_ids.into_iter().zip(responses).collect())
933 }
934
935 async fn handle_batch_catchup_requests(
936 &self,
937 parallelism: usize,
938 requests: Vec<(RegionId, RegionCatchupRequest)>,
939 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
940 let mut responses = Vec::with_capacity(requests.len());
941 let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
942 let mut remaining_region_requests = vec![];
943
944 for (region_id, request) in requests {
945 match self.workers.get_region(region_id) {
946 Some(region) => match region.provider.as_kafka_provider() {
947 Some(provider) => {
948 topic_regions
949 .entry(provider.clone())
950 .or_default()
951 .push((region_id, request));
952 }
953 None => {
954 remaining_region_requests.push((region_id, request));
955 }
956 },
957 None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
958 }
959 }
960
961 let semaphore = Arc::new(Semaphore::new(parallelism));
962
963 if !topic_regions.is_empty() {
964 let mut tasks = Vec::with_capacity(topic_regions.len());
965 for (provider, region_requests) in topic_regions {
966 let semaphore_moved = semaphore.clone();
967 tasks.push(async move {
968 let _permit = semaphore_moved.acquire().await.unwrap();
970 self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
971 .await
972 })
973 }
974
975 let r = try_join_all(tasks).await?;
976 responses.extend(r.into_iter().flatten());
977 }
978
979 if !remaining_region_requests.is_empty() {
980 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
981 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
982 for (region_id, request) in remaining_region_requests {
983 let semaphore_moved = semaphore.clone();
984 region_ids.push(region_id);
985 tasks.push(async move {
986 let _permit = semaphore_moved.acquire().await.unwrap();
988 let (request, receiver) =
989 WorkerRequest::new_catchup_region_request(region_id, request, None);
990
991 self.workers.submit_to_worker(region_id, request).await?;
992
993 receiver.await.context(RecvSnafu)?
994 })
995 }
996
997 let results = join_all(tasks).await;
998 responses.extend(region_ids.into_iter().zip(results));
999 }
1000
1001 Ok(responses)
1002 }
1003
1004 async fn handle_request(
1006 &self,
1007 region_id: RegionId,
1008 request: RegionRequest,
1009 ) -> Result<AffectedRows> {
1010 let region_metadata = self.get_metadata(region_id).ok();
1011 let (request, receiver) =
1012 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
1013 self.workers.submit_to_worker(region_id, request).await?;
1014
1015 receiver.await.context(RecvSnafu)?
1016 }
1017
1018 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
1020 self.find_region(region_id)
1022 .map(|r| r.find_committed_sequence())
1023 }
1024
1025 #[tracing::instrument(skip_all, fields(region_id = %region_id))]
1027 fn scan_region(&self, region_id: RegionId, mut request: ScanRequest) -> Result<ScanRegion> {
1028 let query_start = Instant::now();
1029 let region = self.find_region(region_id)?;
1031 let version_data = region.version_control.current();
1032 let version = version_data.version;
1033
1034 if request.snapshot_on_scan && request.memtable_max_sequence.is_none() {
1035 request.memtable_max_sequence = Some(version_data.committed_sequence);
1036 }
1037
1038 if let Some(given_seq) = request.memtable_min_sequence {
1039 let min_readable_seq = version.flushed_sequence;
1040 ensure!(
1041 given_seq >= min_readable_seq,
1042 IncrementalQueryStaleSnafu {
1043 region_id,
1044 given_seq,
1045 min_readable_seq,
1046 }
1047 );
1048 }
1049
1050 if let Some(given_seq) = request.memtable_max_sequence
1051 && !request.skip_sst_files
1052 {
1053 let min_enforceable_seq = version.flushed_sequence;
1060 ensure!(
1061 given_seq >= min_enforceable_seq,
1062 SnapshotFenceStaleSnafu {
1063 region_id,
1064 given_seq,
1065 min_enforceable_seq,
1066 }
1067 );
1068 }
1069
1070 let cache_manager = self.workers.cache_manager();
1072
1073 let scan_region = ScanRegion::new(
1074 version,
1075 region.access_layer.clone(),
1076 request,
1077 CacheStrategy::EnableAll(cache_manager),
1078 )
1079 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
1080 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
1081 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
1082 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
1083 .with_start_time(query_start);
1084
1085 #[cfg(feature = "enterprise")]
1086 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
1087
1088 Ok(scan_region)
1089 }
1090
1091 #[cfg(feature = "enterprise")]
1092 fn maybe_fill_extension_range_provider(
1093 &self,
1094 mut scan_region: ScanRegion,
1095 region: MitoRegionRef,
1096 ) -> ScanRegion {
1097 if region.is_follower()
1098 && let Some(factory) = self.extension_range_provider_factory.as_ref()
1099 {
1100 scan_region
1101 .set_extension_range_provider(factory.create_extension_range_provider(region));
1102 }
1103 scan_region
1104 }
1105
1106 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1108 let region = self.find_region(region_id)?;
1109 region.set_role(role);
1110 Ok(())
1111 }
1112
1113 async fn set_region_role_state_gracefully(
1115 &self,
1116 region_id: RegionId,
1117 region_role_state: SettableRegionRoleState,
1118 ) -> Result<SetRegionRoleStateResponse> {
1119 let (request, receiver) =
1122 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1123 self.workers.submit_to_worker(region_id, request).await?;
1124
1125 receiver.await.context(RecvSnafu)
1126 }
1127
1128 async fn sync_region(
1129 &self,
1130 region_id: RegionId,
1131 manifest_info: RegionManifestInfo,
1132 ) -> Result<(ManifestVersion, bool)> {
1133 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1134 let manifest_version = manifest_info.data_manifest_version();
1135 let (request, receiver) =
1136 WorkerRequest::new_sync_region_request(region_id, manifest_version);
1137 self.workers.submit_to_worker(region_id, request).await?;
1138
1139 receiver.await.context(RecvSnafu)?
1140 }
1141
1142 async fn remap_manifests(
1143 &self,
1144 request: RemapManifestsRequest,
1145 ) -> Result<RemapManifestsResponse> {
1146 let region_id = request.region_id;
1147 let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1148 self.workers.submit_to_worker(region_id, request).await?;
1149 let manifest_paths = receiver.await.context(RecvSnafu)??;
1150 Ok(RemapManifestsResponse { manifest_paths })
1151 }
1152
1153 async fn copy_region_from(
1154 &self,
1155 region_id: RegionId,
1156 request: MitoCopyRegionFromRequest,
1157 ) -> Result<MitoCopyRegionFromResponse> {
1158 let (request, receiver) =
1159 WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
1160 self.workers.submit_to_worker(region_id, request).await?;
1161 let response = receiver.await.context(RecvSnafu)??;
1162 Ok(response)
1163 }
1164
1165 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1166 self.workers
1167 .get_region(region_id)
1168 .map(|region| region.region_role())
1169 }
1170}
1171
1172fn map_batch_responses(responses: Vec<(RegionId, Result<AffectedRows>)>) -> BatchResponses {
1173 responses
1174 .into_iter()
1175 .map(|(region_id, response)| {
1176 (
1177 region_id,
1178 response.map(RegionResponse::new).map_err(BoxedError::new),
1179 )
1180 })
1181 .collect()
1182}
1183
1184#[async_trait]
1185impl RegionEngine for MitoEngine {
1186 fn name(&self) -> &str {
1187 MITO_ENGINE_NAME
1188 }
1189
1190 #[tracing::instrument(skip_all)]
1191 async fn handle_batch_open_requests(
1192 &self,
1193 parallelism: usize,
1194 requests: Vec<(RegionId, RegionOpenRequest)>,
1195 ) -> Result<BatchResponses, BoxedError> {
1196 self.inner
1198 .handle_batch_open_requests(parallelism, requests)
1199 .await
1200 .map(map_batch_responses)
1201 .map_err(BoxedError::new)
1202 }
1203
1204 #[tracing::instrument(skip_all)]
1205 async fn handle_batch_catchup_requests(
1206 &self,
1207 parallelism: usize,
1208 requests: Vec<(RegionId, RegionCatchupRequest)>,
1209 ) -> Result<BatchResponses, BoxedError> {
1210 self.inner
1211 .handle_batch_catchup_requests(parallelism, requests)
1212 .await
1213 .map(map_batch_responses)
1214 .map_err(BoxedError::new)
1215 }
1216
1217 #[tracing::instrument(skip_all)]
1218 async fn handle_request(
1219 &self,
1220 region_id: RegionId,
1221 request: RegionRequest,
1222 ) -> Result<RegionResponse, BoxedError> {
1223 let _timer = HANDLE_REQUEST_ELAPSED
1224 .with_label_values(&[request.request_type()])
1225 .start_timer();
1226
1227 let is_alter = matches!(request, RegionRequest::Alter(_));
1228 let is_create = matches!(request, RegionRequest::Create(_));
1229 let mut response = self
1230 .inner
1231 .handle_request(region_id, request)
1232 .await
1233 .map(RegionResponse::new)
1234 .map_err(BoxedError::new)?;
1235
1236 if is_alter {
1237 self.handle_alter_response(region_id, &mut response)
1238 .map_err(BoxedError::new)?;
1239 } else if is_create {
1240 self.handle_create_response(region_id, &mut response)
1241 .map_err(BoxedError::new)?;
1242 }
1243
1244 Ok(response)
1245 }
1246
1247 #[tracing::instrument(skip_all)]
1248 async fn handle_query(
1249 &self,
1250 region_id: RegionId,
1251 request: ScanRequest,
1252 ) -> Result<RegionScannerRef, BoxedError> {
1253 self.scan_region(region_id, request)
1254 .map_err(BoxedError::new)?
1255 .region_scanner()
1256 .await
1257 .map_err(BoxedError::new)
1258 }
1259
1260 fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
1261 Some(self.inner.scan_memory_tracker.clone())
1262 }
1263
1264 async fn get_committed_sequence(
1265 &self,
1266 region_id: RegionId,
1267 ) -> Result<SequenceNumber, BoxedError> {
1268 self.inner
1269 .get_committed_sequence(region_id)
1270 .map_err(BoxedError::new)
1271 }
1272
1273 async fn get_metadata(
1275 &self,
1276 region_id: RegionId,
1277 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1278 self.inner.get_metadata(region_id).map_err(BoxedError::new)
1279 }
1280
1281 async fn stop(&self) -> std::result::Result<(), BoxedError> {
1287 self.inner.stop().await.map_err(BoxedError::new)
1288 }
1289
1290 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1291 self.get_region_statistic(region_id)
1292 }
1293
1294 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1295 self.inner
1296 .set_region_role(region_id, role)
1297 .map_err(BoxedError::new)
1298 }
1299
1300 async fn set_region_role_state_gracefully(
1301 &self,
1302 region_id: RegionId,
1303 region_role_state: SettableRegionRoleState,
1304 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1305 let _timer = HANDLE_REQUEST_ELAPSED
1306 .with_label_values(&["set_region_role_state_gracefully"])
1307 .start_timer();
1308
1309 self.inner
1310 .set_region_role_state_gracefully(region_id, region_role_state)
1311 .await
1312 .map_err(BoxedError::new)
1313 }
1314
1315 async fn sync_region(
1316 &self,
1317 region_id: RegionId,
1318 request: SyncRegionFromRequest,
1319 ) -> Result<SyncRegionFromResponse, BoxedError> {
1320 let manifest_info = request
1321 .into_region_manifest_info()
1322 .context(UnexpectedSnafu {
1323 err_msg: "Expected a manifest info request",
1324 })
1325 .map_err(BoxedError::new)?;
1326 let (_, synced) = self
1327 .inner
1328 .sync_region(region_id, manifest_info)
1329 .await
1330 .map_err(BoxedError::new)?;
1331
1332 Ok(SyncRegionFromResponse::Mito { synced })
1333 }
1334
1335 async fn remap_manifests(
1336 &self,
1337 request: RemapManifestsRequest,
1338 ) -> Result<RemapManifestsResponse, BoxedError> {
1339 self.inner
1340 .remap_manifests(request)
1341 .await
1342 .map_err(BoxedError::new)
1343 }
1344
1345 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1346 self.inner.role(region_id)
1347 }
1348
1349 fn as_any(&self) -> &dyn Any {
1350 self
1351 }
1352}
1353
1354impl MitoEngine {
1355 fn handle_alter_response(
1356 &self,
1357 region_id: RegionId,
1358 response: &mut RegionResponse,
1359 ) -> Result<()> {
1360 if let Some(statistic) = self.region_statistic(region_id) {
1361 Self::encode_manifest_info_to_extensions(
1362 ®ion_id,
1363 statistic.manifest,
1364 &mut response.extensions,
1365 )?;
1366 }
1367 let column_metadatas = self
1368 .inner
1369 .find_region(region_id)
1370 .ok()
1371 .map(|r| r.metadata().column_metadatas.clone());
1372 if let Some(column_metadatas) = column_metadatas {
1373 Self::encode_column_metadatas_to_extensions(
1374 ®ion_id,
1375 column_metadatas,
1376 &mut response.extensions,
1377 )?;
1378 }
1379 Ok(())
1380 }
1381
1382 fn handle_create_response(
1383 &self,
1384 region_id: RegionId,
1385 response: &mut RegionResponse,
1386 ) -> Result<()> {
1387 let column_metadatas = self
1388 .inner
1389 .find_region(region_id)
1390 .ok()
1391 .map(|r| r.metadata().column_metadatas.clone());
1392 if let Some(column_metadatas) = column_metadatas {
1393 Self::encode_column_metadatas_to_extensions(
1394 ®ion_id,
1395 column_metadatas,
1396 &mut response.extensions,
1397 )?;
1398 }
1399 Ok(())
1400 }
1401}
1402
1403#[cfg(any(test, feature = "test"))]
1405#[allow(clippy::too_many_arguments)]
1406impl MitoEngine {
1407 pub async fn new_for_test<S: LogStore>(
1409 data_home: &str,
1410 mut config: MitoConfig,
1411 log_store: Arc<S>,
1412 object_store_manager: ObjectStoreManagerRef,
1413 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1414 listener: Option<crate::engine::listener::EventListenerRef>,
1415 time_provider: crate::time_provider::TimeProviderRef,
1416 schema_metadata_manager: SchemaMetadataManagerRef,
1417 file_ref_manager: FileReferenceManagerRef,
1418 partition_expr_fetcher: PartitionExprFetcherRef,
1419 ) -> Result<MitoEngine> {
1420 config.sanitize(data_home)?;
1421
1422 let config = Arc::new(config);
1423 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1424 let total_memory = get_total_memory_bytes().max(0) as u64;
1425 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1426 let scan_memory_tracker =
1427 QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
1428 .on_update(|usage| {
1429 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1430 })
1431 .on_exhausted(|| {
1432 SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
1433 })
1434 .on_reject(|| {
1435 SCAN_REQUESTS_REJECTED_TOTAL.inc();
1436 })
1437 .build();
1438 Ok(MitoEngine {
1439 inner: Arc::new(EngineInner {
1440 workers: WorkerGroup::start_for_test(
1441 config.clone(),
1442 log_store,
1443 object_store_manager,
1444 write_buffer_manager,
1445 listener,
1446 schema_metadata_manager,
1447 file_ref_manager,
1448 time_provider,
1449 partition_expr_fetcher,
1450 )
1451 .await?,
1452 config,
1453 wal_raw_entry_reader,
1454 scan_memory_tracker,
1455 #[cfg(feature = "enterprise")]
1456 extension_range_provider_factory: None,
1457 }),
1458 })
1459 }
1460
1461 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1463 self.inner.workers.purge_scheduler()
1464 }
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469 use std::time::Duration;
1470
1471 use super::*;
1472 use crate::sst::file::FileMeta;
1473
1474 #[test]
1475 fn test_is_valid_region_edit() {
1476 let edit = RegionEdit {
1478 files_to_add: vec![FileMeta::default()],
1479 files_to_remove: vec![],
1480 timestamp_ms: None,
1481 compaction_time_window: None,
1482 flushed_entry_id: None,
1483 flushed_sequence: None,
1484 committed_sequence: None,
1485 };
1486 assert!(is_valid_region_edit(&edit));
1487
1488 let edit = RegionEdit {
1490 files_to_add: vec![],
1491 files_to_remove: vec![],
1492 timestamp_ms: None,
1493 compaction_time_window: None,
1494 flushed_entry_id: None,
1495 flushed_sequence: None,
1496 committed_sequence: None,
1497 };
1498 assert!(!is_valid_region_edit(&edit));
1499
1500 let edit = RegionEdit {
1502 files_to_add: vec![],
1503 files_to_remove: vec![FileMeta::default()],
1504 timestamp_ms: None,
1505 compaction_time_window: None,
1506 flushed_entry_id: None,
1507 flushed_sequence: None,
1508 committed_sequence: None,
1509 };
1510 assert!(is_valid_region_edit(&edit));
1511
1512 let edit = RegionEdit {
1514 files_to_add: vec![FileMeta::default()],
1515 files_to_remove: vec![FileMeta::default()],
1516 timestamp_ms: None,
1517 compaction_time_window: None,
1518 flushed_entry_id: None,
1519 flushed_sequence: None,
1520 committed_sequence: None,
1521 };
1522 assert!(is_valid_region_edit(&edit));
1523
1524 let edit = RegionEdit {
1526 files_to_add: vec![FileMeta::default()],
1527 files_to_remove: vec![],
1528 timestamp_ms: None,
1529 compaction_time_window: Some(Duration::from_secs(1)),
1530 flushed_entry_id: None,
1531 flushed_sequence: None,
1532 committed_sequence: None,
1533 };
1534 assert!(!is_valid_region_edit(&edit));
1535 let edit = RegionEdit {
1536 files_to_add: vec![FileMeta::default()],
1537 files_to_remove: vec![],
1538 timestamp_ms: None,
1539 compaction_time_window: None,
1540 flushed_entry_id: Some(1),
1541 flushed_sequence: None,
1542 committed_sequence: None,
1543 };
1544 assert!(!is_valid_region_edit(&edit));
1545 let edit = RegionEdit {
1546 files_to_add: vec![FileMeta::default()],
1547 files_to_remove: vec![],
1548 timestamp_ms: None,
1549 compaction_time_window: None,
1550 flushed_entry_id: None,
1551 flushed_sequence: Some(1),
1552 committed_sequence: None,
1553 };
1554 assert!(!is_valid_region_edit(&edit));
1555 }
1556}