mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod scan_test;
57#[cfg(test)]
58mod set_role_state_test;
59#[cfg(test)]
60mod sync_test;
61#[cfg(test)]
62mod truncate_test;
63
64use std::any::Any;
65use std::collections::HashMap;
66use std::sync::Arc;
67use std::time::Instant;
68
69use api::region::RegionResponse;
70use async_trait::async_trait;
71use common_base::Plugins;
72use common_error::ext::BoxedError;
73use common_meta::key::SchemaMetadataManagerRef;
74use common_recordbatch::SendableRecordBatchStream;
75use common_telemetry::{info, tracing};
76use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
77use futures::future::{join_all, try_join_all};
78use object_store::manager::ObjectStoreManagerRef;
79use snafu::{ensure, OptionExt, ResultExt};
80use store_api::codec::PrimaryKeyEncoding;
81use store_api::logstore::provider::Provider;
82use store_api::logstore::LogStore;
83use store_api::manifest::ManifestVersion;
84use store_api::metadata::RegionMetadataRef;
85use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
86use store_api::region_engine::{
87    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
88    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
89};
90use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
91use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
92use tokio::sync::{oneshot, Semaphore};
93
94use crate::cache::CacheStrategy;
95use crate::config::MitoConfig;
96use crate::error::{
97    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
98    SerdeJsonSnafu,
99};
100use crate::manifest::action::RegionEdit;
101use crate::metrics::HANDLE_REQUEST_ELAPSED;
102use crate::read::scan_region::{ScanRegion, Scanner};
103use crate::request::{RegionEditRequest, WorkerRequest};
104use crate::wal::entry_distributor::{
105    build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
106};
107use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
108use crate::worker::WorkerGroup;
109
110pub const MITO_ENGINE_NAME: &str = "mito";
111
112/// Region engine implementation for timeseries data.
113#[derive(Clone)]
114pub struct MitoEngine {
115    inner: Arc<EngineInner>,
116}
117
118impl MitoEngine {
119    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
120    pub async fn new<S: LogStore>(
121        data_home: &str,
122        mut config: MitoConfig,
123        log_store: Arc<S>,
124        object_store_manager: ObjectStoreManagerRef,
125        schema_metadata_manager: SchemaMetadataManagerRef,
126        plugins: Plugins,
127    ) -> Result<MitoEngine> {
128        config.sanitize(data_home)?;
129
130        Ok(MitoEngine {
131            inner: Arc::new(
132                EngineInner::new(
133                    config,
134                    log_store,
135                    object_store_manager,
136                    schema_metadata_manager,
137                    plugins,
138                )
139                .await?,
140            ),
141        })
142    }
143
144    /// Returns true if the specific region exists.
145    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
146        self.inner.workers.is_region_exists(region_id)
147    }
148
149    /// Returns true if the specific region exists.
150    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
151        self.inner.workers.is_region_opening(region_id)
152    }
153
154    /// Returns the region disk/memory statistic.
155    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
156        self.inner
157            .workers
158            .get_region(region_id)
159            .map(|region| region.region_statistic())
160    }
161
162    /// Returns primary key encoding of the region.
163    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
164        self.inner
165            .workers
166            .get_region(region_id)
167            .map(|r| r.primary_key_encoding())
168    }
169
170    /// Handle substrait query and return a stream of record batches
171    ///
172    /// Notice that the output stream's ordering is not guranateed. If order
173    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
174    #[tracing::instrument(skip_all)]
175    pub async fn scan_to_stream(
176        &self,
177        region_id: RegionId,
178        request: ScanRequest,
179    ) -> Result<SendableRecordBatchStream, BoxedError> {
180        self.scanner(region_id, request)
181            .map_err(BoxedError::new)?
182            .scan()
183            .await
184    }
185
186    /// Returns a scanner to scan for `request`.
187    fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
188        self.scan_region(region_id, request)?.scanner()
189    }
190
191    /// Scans a region.
192    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
193        self.inner.scan_region(region_id, request)
194    }
195
196    /// Edit region's metadata by [RegionEdit] directly. Use with care.
197    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
198    /// Other region editing intention will result in an "invalid request" error.
199    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
200    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
201        let _timer = HANDLE_REQUEST_ELAPSED
202            .with_label_values(&["edit_region"])
203            .start_timer();
204
205        ensure!(
206            is_valid_region_edit(&edit),
207            InvalidRequestSnafu {
208                region_id,
209                reason: "invalid region edit"
210            }
211        );
212
213        let (tx, rx) = oneshot::channel();
214        let request = WorkerRequest::EditRegion(RegionEditRequest {
215            region_id,
216            edit,
217            tx,
218        });
219        self.inner
220            .workers
221            .submit_to_worker(region_id, request)
222            .await?;
223        rx.await.context(RecvSnafu)?
224    }
225
226    #[cfg(test)]
227    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
228        self.inner.workers.get_region(id)
229    }
230
231    fn encode_manifest_info_to_extensions(
232        region_id: &RegionId,
233        manifest_info: RegionManifestInfo,
234        extensions: &mut HashMap<String, Vec<u8>>,
235    ) -> Result<()> {
236        let region_manifest_info = vec![(*region_id, manifest_info)];
237
238        extensions.insert(
239            MANIFEST_INFO_EXTENSION_KEY.to_string(),
240            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
241        );
242        info!(
243            "Added manifest info: {:?} to extensions, region_id: {:?}",
244            region_manifest_info, region_id
245        );
246        Ok(())
247    }
248}
249
250/// Check whether the region edit is valid. Only adding files to region is considered valid now.
251fn is_valid_region_edit(edit: &RegionEdit) -> bool {
252    !edit.files_to_add.is_empty()
253        && edit.files_to_remove.is_empty()
254        && matches!(
255            edit,
256            RegionEdit {
257                files_to_add: _,
258                files_to_remove: _,
259                compaction_time_window: None,
260                flushed_entry_id: None,
261                flushed_sequence: None,
262            }
263        )
264}
265
266/// Inner struct of [MitoEngine].
267struct EngineInner {
268    /// Region workers group.
269    workers: WorkerGroup,
270    /// Config of the engine.
271    config: Arc<MitoConfig>,
272    /// The Wal raw entry reader.
273    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
274}
275
276type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
277
278/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
279fn prepare_batch_open_requests(
280    requests: Vec<(RegionId, RegionOpenRequest)>,
281) -> Result<(
282    TopicGroupedRegionOpenRequests,
283    Vec<(RegionId, RegionOpenRequest)>,
284)> {
285    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
286    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
287    for (region_id, request) in requests {
288        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
289            serde_json::from_str(options).context(SerdeJsonSnafu)?
290        } else {
291            WalOptions::RaftEngine
292        };
293        match options {
294            WalOptions::Kafka(options) => {
295                topic_to_regions
296                    .entry(options.topic)
297                    .or_default()
298                    .push((region_id, request));
299            }
300            WalOptions::RaftEngine | WalOptions::Noop => {
301                remaining_regions.push((region_id, request));
302            }
303        }
304    }
305
306    Ok((topic_to_regions, remaining_regions))
307}
308
309impl EngineInner {
310    /// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`.
311    async fn new<S: LogStore>(
312        config: MitoConfig,
313        log_store: Arc<S>,
314        object_store_manager: ObjectStoreManagerRef,
315        schema_metadata_manager: SchemaMetadataManagerRef,
316        plugins: Plugins,
317    ) -> Result<EngineInner> {
318        let config = Arc::new(config);
319        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
320        Ok(EngineInner {
321            workers: WorkerGroup::start(
322                config.clone(),
323                log_store,
324                object_store_manager,
325                schema_metadata_manager,
326                plugins,
327            )
328            .await?,
329            config,
330            wal_raw_entry_reader,
331        })
332    }
333
334    /// Stop the inner engine.
335    async fn stop(&self) -> Result<()> {
336        self.workers.stop().await
337    }
338
339    /// Get metadata of a region.
340    ///
341    /// Returns error if the region doesn't exist.
342    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
343        // Reading a region doesn't need to go through the region worker thread.
344        let region = self
345            .workers
346            .get_region(region_id)
347            .context(RegionNotFoundSnafu { region_id })?;
348        Ok(region.metadata())
349    }
350
351    async fn open_topic_regions(
352        &self,
353        topic: String,
354        region_requests: Vec<(RegionId, RegionOpenRequest)>,
355    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
356        let region_ids = region_requests
357            .iter()
358            .map(|(region_id, _)| *region_id)
359            .collect::<Vec<_>>();
360        let provider = Provider::kafka_provider(topic);
361        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
362            provider,
363            self.wal_raw_entry_reader.clone(),
364            &region_ids,
365            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
366        );
367
368        let mut responses = Vec::with_capacity(region_requests.len());
369        for ((region_id, request), entry_receiver) in
370            region_requests.into_iter().zip(entry_receivers)
371        {
372            let (request, receiver) =
373                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
374            self.workers.submit_to_worker(region_id, request).await?;
375            responses.push(async move { receiver.await.context(RecvSnafu)? });
376        }
377
378        // Waits for entries distribution.
379        let distribution =
380            common_runtime::spawn_global(async move { distributor.distribute().await });
381        // Waits for worker returns.
382        let responses = join_all(responses).await;
383
384        distribution.await.context(JoinSnafu)??;
385        Ok(region_ids.into_iter().zip(responses).collect())
386    }
387
388    async fn handle_batch_open_requests(
389        &self,
390        parallelism: usize,
391        requests: Vec<(RegionId, RegionOpenRequest)>,
392    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
393        let semaphore = Arc::new(Semaphore::new(parallelism));
394        let (topic_to_region_requests, remaining_region_requests) =
395            prepare_batch_open_requests(requests)?;
396        let mut responses =
397            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
398
399        if !topic_to_region_requests.is_empty() {
400            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
401            for (topic, region_requests) in topic_to_region_requests {
402                let semaphore_moved = semaphore.clone();
403                tasks.push(async move {
404                    // Safety: semaphore must exist
405                    let _permit = semaphore_moved.acquire().await.unwrap();
406                    self.open_topic_regions(topic, region_requests).await
407                })
408            }
409            let r = try_join_all(tasks).await?;
410            responses.extend(r.into_iter().flatten());
411        }
412
413        if !remaining_region_requests.is_empty() {
414            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
415            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
416            for (region_id, request) in remaining_region_requests {
417                let semaphore_moved = semaphore.clone();
418                region_ids.push(region_id);
419                tasks.push(async move {
420                    // Safety: semaphore must exist
421                    let _permit = semaphore_moved.acquire().await.unwrap();
422                    let (request, receiver) =
423                        WorkerRequest::new_open_region_request(region_id, request, None);
424
425                    self.workers.submit_to_worker(region_id, request).await?;
426
427                    receiver.await.context(RecvSnafu)?
428                })
429            }
430
431            let results = join_all(tasks).await;
432            responses.extend(region_ids.into_iter().zip(results));
433        }
434
435        Ok(responses)
436    }
437
438    /// Handles [RegionRequest] and return its executed result.
439    async fn handle_request(
440        &self,
441        region_id: RegionId,
442        request: RegionRequest,
443    ) -> Result<AffectedRows> {
444        let region_metadata = self.get_metadata(region_id).ok();
445        let (request, receiver) =
446            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
447        self.workers.submit_to_worker(region_id, request).await?;
448
449        receiver.await.context(RecvSnafu)?
450    }
451
452    fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
453        // Reading a region doesn't need to go through the region worker thread.
454        let region = self
455            .workers
456            .get_region(region_id)
457            .context(RegionNotFoundSnafu { region_id })?;
458        let version_ctrl = &region.version_control;
459        let seq = Some(version_ctrl.committed_sequence());
460        Ok(seq)
461    }
462
463    /// Handles the scan `request` and returns a [ScanRegion].
464    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
465        let query_start = Instant::now();
466        // Reading a region doesn't need to go through the region worker thread.
467        let region = self
468            .workers
469            .get_region(region_id)
470            .context(RegionNotFoundSnafu { region_id })?;
471        let version = region.version();
472        // Get cache.
473        let cache_manager = self.workers.cache_manager();
474
475        let scan_region = ScanRegion::new(
476            version,
477            region.access_layer.clone(),
478            request,
479            CacheStrategy::EnableAll(cache_manager),
480        )
481        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
482        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
483        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
484        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
485        .with_start_time(query_start);
486
487        Ok(scan_region)
488    }
489
490    /// Converts the [`RegionRole`].
491    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
492        let region = self
493            .workers
494            .get_region(region_id)
495            .context(RegionNotFoundSnafu { region_id })?;
496
497        region.set_role(role);
498        Ok(())
499    }
500
501    /// Sets read-only for a region and ensures no more writes in the region after it returns.
502    async fn set_region_role_state_gracefully(
503        &self,
504        region_id: RegionId,
505        region_role_state: SettableRegionRoleState,
506    ) -> Result<SetRegionRoleStateResponse> {
507        // Notes: It acquires the mutable ownership to ensure no other threads,
508        // Therefore, we submit it to the worker.
509        let (request, receiver) =
510            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
511        self.workers.submit_to_worker(region_id, request).await?;
512
513        receiver.await.context(RecvSnafu)
514    }
515
516    async fn sync_region(
517        &self,
518        region_id: RegionId,
519        manifest_info: RegionManifestInfo,
520    ) -> Result<(ManifestVersion, bool)> {
521        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
522        let manifest_version = manifest_info.data_manifest_version();
523        let (request, receiver) =
524            WorkerRequest::new_sync_region_request(region_id, manifest_version);
525        self.workers.submit_to_worker(region_id, request).await?;
526
527        receiver.await.context(RecvSnafu)?
528    }
529
530    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
531        self.workers.get_region(region_id).map(|region| {
532            if region.is_follower() {
533                RegionRole::Follower
534            } else {
535                RegionRole::Leader
536            }
537        })
538    }
539}
540
541#[async_trait]
542impl RegionEngine for MitoEngine {
543    fn name(&self) -> &str {
544        MITO_ENGINE_NAME
545    }
546
547    #[tracing::instrument(skip_all)]
548    async fn handle_batch_open_requests(
549        &self,
550        parallelism: usize,
551        requests: Vec<(RegionId, RegionOpenRequest)>,
552    ) -> Result<BatchResponses, BoxedError> {
553        // TODO(weny): add metrics.
554        self.inner
555            .handle_batch_open_requests(parallelism, requests)
556            .await
557            .map(|responses| {
558                responses
559                    .into_iter()
560                    .map(|(region_id, response)| {
561                        (
562                            region_id,
563                            response.map(RegionResponse::new).map_err(BoxedError::new),
564                        )
565                    })
566                    .collect::<Vec<_>>()
567            })
568            .map_err(BoxedError::new)
569    }
570
571    #[tracing::instrument(skip_all)]
572    async fn handle_request(
573        &self,
574        region_id: RegionId,
575        request: RegionRequest,
576    ) -> Result<RegionResponse, BoxedError> {
577        let _timer = HANDLE_REQUEST_ELAPSED
578            .with_label_values(&[request.request_type()])
579            .start_timer();
580
581        let is_alter = matches!(request, RegionRequest::Alter(_));
582        let mut response = self
583            .inner
584            .handle_request(region_id, request)
585            .await
586            .map(RegionResponse::new)
587            .map_err(BoxedError::new)?;
588
589        if is_alter {
590            if let Some(statistic) = self.region_statistic(region_id) {
591                Self::encode_manifest_info_to_extensions(
592                    &region_id,
593                    statistic.manifest,
594                    &mut response.extensions,
595                )
596                .map_err(BoxedError::new)?;
597            }
598        }
599
600        Ok(response)
601    }
602
603    #[tracing::instrument(skip_all)]
604    async fn handle_query(
605        &self,
606        region_id: RegionId,
607        request: ScanRequest,
608    ) -> Result<RegionScannerRef, BoxedError> {
609        self.scan_region(region_id, request)
610            .map_err(BoxedError::new)?
611            .region_scanner()
612            .map_err(BoxedError::new)
613    }
614
615    async fn get_last_seq_num(
616        &self,
617        region_id: RegionId,
618    ) -> Result<Option<SequenceNumber>, BoxedError> {
619        self.inner
620            .get_last_seq_num(region_id)
621            .map_err(BoxedError::new)
622    }
623
624    /// Retrieve region's metadata.
625    async fn get_metadata(
626        &self,
627        region_id: RegionId,
628    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
629        self.inner.get_metadata(region_id).map_err(BoxedError::new)
630    }
631
632    /// Stop the engine.
633    ///
634    /// Stopping the engine doesn't stop the underlying log store as other components might
635    /// still use it. (When no other components are referencing the log store, it will
636    /// automatically shutdown.)
637    async fn stop(&self) -> std::result::Result<(), BoxedError> {
638        self.inner.stop().await.map_err(BoxedError::new)
639    }
640
641    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
642        self.get_region_statistic(region_id)
643    }
644
645    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
646        self.inner
647            .set_region_role(region_id, role)
648            .map_err(BoxedError::new)
649    }
650
651    async fn set_region_role_state_gracefully(
652        &self,
653        region_id: RegionId,
654        region_role_state: SettableRegionRoleState,
655    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
656        let _timer = HANDLE_REQUEST_ELAPSED
657            .with_label_values(&["set_region_role_state_gracefully"])
658            .start_timer();
659
660        self.inner
661            .set_region_role_state_gracefully(region_id, region_role_state)
662            .await
663            .map_err(BoxedError::new)
664    }
665
666    async fn sync_region(
667        &self,
668        region_id: RegionId,
669        manifest_info: RegionManifestInfo,
670    ) -> Result<SyncManifestResponse, BoxedError> {
671        let (_, synced) = self
672            .inner
673            .sync_region(region_id, manifest_info)
674            .await
675            .map_err(BoxedError::new)?;
676
677        Ok(SyncManifestResponse::Mito { synced })
678    }
679
680    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
681        self.inner.role(region_id)
682    }
683
684    fn as_any(&self) -> &dyn Any {
685        self
686    }
687}
688
689// Tests methods.
690#[cfg(any(test, feature = "test"))]
691#[allow(clippy::too_many_arguments)]
692impl MitoEngine {
693    /// Returns a new [MitoEngine] for tests.
694    pub async fn new_for_test<S: LogStore>(
695        data_home: &str,
696        mut config: MitoConfig,
697        log_store: Arc<S>,
698        object_store_manager: ObjectStoreManagerRef,
699        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
700        listener: Option<crate::engine::listener::EventListenerRef>,
701        time_provider: crate::time_provider::TimeProviderRef,
702        schema_metadata_manager: SchemaMetadataManagerRef,
703    ) -> Result<MitoEngine> {
704        config.sanitize(data_home)?;
705
706        let config = Arc::new(config);
707        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
708        Ok(MitoEngine {
709            inner: Arc::new(EngineInner {
710                workers: WorkerGroup::start_for_test(
711                    config.clone(),
712                    log_store,
713                    object_store_manager,
714                    write_buffer_manager,
715                    listener,
716                    schema_metadata_manager,
717                    time_provider,
718                )
719                .await?,
720                config,
721                wal_raw_entry_reader,
722            }),
723        })
724    }
725
726    /// Returns the purge scheduler.
727    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
728        self.inner.workers.purge_scheduler()
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use std::time::Duration;
735
736    use super::*;
737    use crate::sst::file::FileMeta;
738
739    #[test]
740    fn test_is_valid_region_edit() {
741        // Valid: has only "files_to_add"
742        let edit = RegionEdit {
743            files_to_add: vec![FileMeta::default()],
744            files_to_remove: vec![],
745            compaction_time_window: None,
746            flushed_entry_id: None,
747            flushed_sequence: None,
748        };
749        assert!(is_valid_region_edit(&edit));
750
751        // Invalid: "files_to_add" is empty
752        let edit = RegionEdit {
753            files_to_add: vec![],
754            files_to_remove: vec![],
755            compaction_time_window: None,
756            flushed_entry_id: None,
757            flushed_sequence: None,
758        };
759        assert!(!is_valid_region_edit(&edit));
760
761        // Invalid: "files_to_remove" is not empty
762        let edit = RegionEdit {
763            files_to_add: vec![FileMeta::default()],
764            files_to_remove: vec![FileMeta::default()],
765            compaction_time_window: None,
766            flushed_entry_id: None,
767            flushed_sequence: None,
768        };
769        assert!(!is_valid_region_edit(&edit));
770
771        // Invalid: other fields are not all "None"s
772        let edit = RegionEdit {
773            files_to_add: vec![FileMeta::default()],
774            files_to_remove: vec![],
775            compaction_time_window: Some(Duration::from_secs(1)),
776            flushed_entry_id: None,
777            flushed_sequence: None,
778        };
779        assert!(!is_valid_region_edit(&edit));
780        let edit = RegionEdit {
781            files_to_add: vec![FileMeta::default()],
782            files_to_remove: vec![],
783            compaction_time_window: None,
784            flushed_entry_id: Some(1),
785            flushed_sequence: None,
786        };
787        assert!(!is_valid_region_edit(&edit));
788        let edit = RegionEdit {
789            files_to_add: vec![FileMeta::default()],
790            files_to_remove: vec![],
791            compaction_time_window: None,
792            flushed_entry_id: None,
793            flushed_sequence: Some(1),
794        };
795        assert!(!is_valid_region_edit(&edit));
796    }
797}