mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod set_role_state_test;
57#[cfg(test)]
58mod sync_test;
59#[cfg(test)]
60mod truncate_test;
61
62use std::any::Any;
63use std::collections::HashMap;
64use std::sync::Arc;
65use std::time::Instant;
66
67use api::region::RegionResponse;
68use async_trait::async_trait;
69use common_base::Plugins;
70use common_error::ext::BoxedError;
71use common_meta::key::SchemaMetadataManagerRef;
72use common_recordbatch::SendableRecordBatchStream;
73use common_telemetry::{info, tracing};
74use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
75use futures::future::{join_all, try_join_all};
76use object_store::manager::ObjectStoreManagerRef;
77use snafu::{ensure, OptionExt, ResultExt};
78use store_api::codec::PrimaryKeyEncoding;
79use store_api::logstore::provider::Provider;
80use store_api::logstore::LogStore;
81use store_api::manifest::ManifestVersion;
82use store_api::metadata::RegionMetadataRef;
83use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
84use store_api::region_engine::{
85    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
86    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
87};
88use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
89use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
90use tokio::sync::{oneshot, Semaphore};
91
92use crate::cache::CacheStrategy;
93use crate::config::MitoConfig;
94use crate::error::{
95    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
96    SerdeJsonSnafu,
97};
98use crate::manifest::action::RegionEdit;
99use crate::metrics::HANDLE_REQUEST_ELAPSED;
100use crate::read::scan_region::{ScanRegion, Scanner};
101use crate::request::{RegionEditRequest, WorkerRequest};
102use crate::wal::entry_distributor::{
103    build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
104};
105use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
106use crate::worker::WorkerGroup;
107
108pub const MITO_ENGINE_NAME: &str = "mito";
109
110/// Region engine implementation for timeseries data.
111#[derive(Clone)]
112pub struct MitoEngine {
113    inner: Arc<EngineInner>,
114}
115
116impl MitoEngine {
117    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
118    pub async fn new<S: LogStore>(
119        data_home: &str,
120        mut config: MitoConfig,
121        log_store: Arc<S>,
122        object_store_manager: ObjectStoreManagerRef,
123        schema_metadata_manager: SchemaMetadataManagerRef,
124        plugins: Plugins,
125    ) -> Result<MitoEngine> {
126        config.sanitize(data_home)?;
127
128        Ok(MitoEngine {
129            inner: Arc::new(
130                EngineInner::new(
131                    config,
132                    log_store,
133                    object_store_manager,
134                    schema_metadata_manager,
135                    plugins,
136                )
137                .await?,
138            ),
139        })
140    }
141
142    /// Returns true if the specific region exists.
143    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
144        self.inner.workers.is_region_exists(region_id)
145    }
146
147    /// Returns true if the specific region exists.
148    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
149        self.inner.workers.is_region_opening(region_id)
150    }
151
152    /// Returns the region disk/memory statistic.
153    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
154        self.inner
155            .workers
156            .get_region(region_id)
157            .map(|region| region.region_statistic())
158    }
159
160    /// Returns primary key encoding of the region.
161    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
162        self.inner
163            .workers
164            .get_region(region_id)
165            .map(|r| r.primary_key_encoding())
166    }
167
168    /// Handle substrait query and return a stream of record batches
169    ///
170    /// Notice that the output stream's ordering is not guranateed. If order
171    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
172    #[tracing::instrument(skip_all)]
173    pub async fn scan_to_stream(
174        &self,
175        region_id: RegionId,
176        request: ScanRequest,
177    ) -> Result<SendableRecordBatchStream, BoxedError> {
178        self.scanner(region_id, request)
179            .map_err(BoxedError::new)?
180            .scan()
181            .await
182    }
183
184    /// Returns a scanner to scan for `request`.
185    fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
186        self.scan_region(region_id, request)?.scanner()
187    }
188
189    /// Scans a region.
190    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
191        self.inner.scan_region(region_id, request)
192    }
193
194    /// Edit region's metadata by [RegionEdit] directly. Use with care.
195    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
196    /// Other region editing intention will result in an "invalid request" error.
197    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
198    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
199        let _timer = HANDLE_REQUEST_ELAPSED
200            .with_label_values(&["edit_region"])
201            .start_timer();
202
203        ensure!(
204            is_valid_region_edit(&edit),
205            InvalidRequestSnafu {
206                region_id,
207                reason: "invalid region edit"
208            }
209        );
210
211        let (tx, rx) = oneshot::channel();
212        let request = WorkerRequest::EditRegion(RegionEditRequest {
213            region_id,
214            edit,
215            tx,
216        });
217        self.inner
218            .workers
219            .submit_to_worker(region_id, request)
220            .await?;
221        rx.await.context(RecvSnafu)?
222    }
223
224    #[cfg(test)]
225    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
226        self.inner.workers.get_region(id)
227    }
228
229    fn encode_manifest_info_to_extensions(
230        region_id: &RegionId,
231        manifest_info: RegionManifestInfo,
232        extensions: &mut HashMap<String, Vec<u8>>,
233    ) -> Result<()> {
234        let region_manifest_info = vec![(*region_id, manifest_info)];
235
236        extensions.insert(
237            MANIFEST_INFO_EXTENSION_KEY.to_string(),
238            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
239        );
240        info!(
241            "Added manifest info: {:?} to extensions, region_id: {:?}",
242            region_manifest_info, region_id
243        );
244        Ok(())
245    }
246}
247
248/// Check whether the region edit is valid. Only adding files to region is considered valid now.
249fn is_valid_region_edit(edit: &RegionEdit) -> bool {
250    !edit.files_to_add.is_empty()
251        && edit.files_to_remove.is_empty()
252        && matches!(
253            edit,
254            RegionEdit {
255                files_to_add: _,
256                files_to_remove: _,
257                compaction_time_window: None,
258                flushed_entry_id: None,
259                flushed_sequence: None,
260            }
261        )
262}
263
264/// Inner struct of [MitoEngine].
265struct EngineInner {
266    /// Region workers group.
267    workers: WorkerGroup,
268    /// Config of the engine.
269    config: Arc<MitoConfig>,
270    /// The Wal raw entry reader.
271    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
272}
273
274type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
275
276/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
277fn prepare_batch_open_requests(
278    requests: Vec<(RegionId, RegionOpenRequest)>,
279) -> Result<(
280    TopicGroupedRegionOpenRequests,
281    Vec<(RegionId, RegionOpenRequest)>,
282)> {
283    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
284    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
285    for (region_id, request) in requests {
286        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
287            serde_json::from_str(options).context(SerdeJsonSnafu)?
288        } else {
289            WalOptions::RaftEngine
290        };
291        match options {
292            WalOptions::Kafka(options) => {
293                topic_to_regions
294                    .entry(options.topic)
295                    .or_default()
296                    .push((region_id, request));
297            }
298            WalOptions::RaftEngine | WalOptions::Noop => {
299                remaining_regions.push((region_id, request));
300            }
301        }
302    }
303
304    Ok((topic_to_regions, remaining_regions))
305}
306
307impl EngineInner {
308    /// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`.
309    async fn new<S: LogStore>(
310        config: MitoConfig,
311        log_store: Arc<S>,
312        object_store_manager: ObjectStoreManagerRef,
313        schema_metadata_manager: SchemaMetadataManagerRef,
314        plugins: Plugins,
315    ) -> Result<EngineInner> {
316        let config = Arc::new(config);
317        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
318        Ok(EngineInner {
319            workers: WorkerGroup::start(
320                config.clone(),
321                log_store,
322                object_store_manager,
323                schema_metadata_manager,
324                plugins,
325            )
326            .await?,
327            config,
328            wal_raw_entry_reader,
329        })
330    }
331
332    /// Stop the inner engine.
333    async fn stop(&self) -> Result<()> {
334        self.workers.stop().await
335    }
336
337    /// Get metadata of a region.
338    ///
339    /// Returns error if the region doesn't exist.
340    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
341        // Reading a region doesn't need to go through the region worker thread.
342        let region = self
343            .workers
344            .get_region(region_id)
345            .context(RegionNotFoundSnafu { region_id })?;
346        Ok(region.metadata())
347    }
348
349    async fn open_topic_regions(
350        &self,
351        topic: String,
352        region_requests: Vec<(RegionId, RegionOpenRequest)>,
353    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
354        let region_ids = region_requests
355            .iter()
356            .map(|(region_id, _)| *region_id)
357            .collect::<Vec<_>>();
358        let provider = Provider::kafka_provider(topic);
359        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
360            provider,
361            self.wal_raw_entry_reader.clone(),
362            &region_ids,
363            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
364        );
365
366        let mut responses = Vec::with_capacity(region_requests.len());
367        for ((region_id, request), entry_receiver) in
368            region_requests.into_iter().zip(entry_receivers)
369        {
370            let (request, receiver) =
371                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
372            self.workers.submit_to_worker(region_id, request).await?;
373            responses.push(async move { receiver.await.context(RecvSnafu)? });
374        }
375
376        // Waits for entries distribution.
377        let distribution =
378            common_runtime::spawn_global(async move { distributor.distribute().await });
379        // Waits for worker returns.
380        let responses = join_all(responses).await;
381
382        distribution.await.context(JoinSnafu)??;
383        Ok(region_ids.into_iter().zip(responses).collect())
384    }
385
386    async fn handle_batch_open_requests(
387        &self,
388        parallelism: usize,
389        requests: Vec<(RegionId, RegionOpenRequest)>,
390    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
391        let semaphore = Arc::new(Semaphore::new(parallelism));
392        let (topic_to_region_requests, remaining_region_requests) =
393            prepare_batch_open_requests(requests)?;
394        let mut responses =
395            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
396
397        if !topic_to_region_requests.is_empty() {
398            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
399            for (topic, region_requests) in topic_to_region_requests {
400                let semaphore_moved = semaphore.clone();
401                tasks.push(async move {
402                    // Safety: semaphore must exist
403                    let _permit = semaphore_moved.acquire().await.unwrap();
404                    self.open_topic_regions(topic, region_requests).await
405                })
406            }
407            let r = try_join_all(tasks).await?;
408            responses.extend(r.into_iter().flatten());
409        }
410
411        if !remaining_region_requests.is_empty() {
412            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
413            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
414            for (region_id, request) in remaining_region_requests {
415                let semaphore_moved = semaphore.clone();
416                region_ids.push(region_id);
417                tasks.push(async move {
418                    // Safety: semaphore must exist
419                    let _permit = semaphore_moved.acquire().await.unwrap();
420                    let (request, receiver) =
421                        WorkerRequest::new_open_region_request(region_id, request, None);
422
423                    self.workers.submit_to_worker(region_id, request).await?;
424
425                    receiver.await.context(RecvSnafu)?
426                })
427            }
428
429            let results = join_all(tasks).await;
430            responses.extend(region_ids.into_iter().zip(results));
431        }
432
433        Ok(responses)
434    }
435
436    /// Handles [RegionRequest] and return its executed result.
437    async fn handle_request(
438        &self,
439        region_id: RegionId,
440        request: RegionRequest,
441    ) -> Result<AffectedRows> {
442        let region_metadata = self.get_metadata(region_id).ok();
443        let (request, receiver) =
444            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
445        self.workers.submit_to_worker(region_id, request).await?;
446
447        receiver.await.context(RecvSnafu)?
448    }
449
450    fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
451        // Reading a region doesn't need to go through the region worker thread.
452        let region = self
453            .workers
454            .get_region(region_id)
455            .context(RegionNotFoundSnafu { region_id })?;
456        let version_ctrl = &region.version_control;
457        let seq = Some(version_ctrl.committed_sequence());
458        Ok(seq)
459    }
460
461    /// Handles the scan `request` and returns a [ScanRegion].
462    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
463        let query_start = Instant::now();
464        // Reading a region doesn't need to go through the region worker thread.
465        let region = self
466            .workers
467            .get_region(region_id)
468            .context(RegionNotFoundSnafu { region_id })?;
469        let version = region.version();
470        // Get cache.
471        let cache_manager = self.workers.cache_manager();
472
473        let scan_region = ScanRegion::new(
474            version,
475            region.access_layer.clone(),
476            request,
477            CacheStrategy::EnableAll(cache_manager),
478        )
479        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
480        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
481        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
482        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
483        .with_start_time(query_start);
484
485        Ok(scan_region)
486    }
487
488    /// Converts the [`RegionRole`].
489    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
490        let region = self
491            .workers
492            .get_region(region_id)
493            .context(RegionNotFoundSnafu { region_id })?;
494
495        region.set_role(role);
496        Ok(())
497    }
498
499    /// Sets read-only for a region and ensures no more writes in the region after it returns.
500    async fn set_region_role_state_gracefully(
501        &self,
502        region_id: RegionId,
503        region_role_state: SettableRegionRoleState,
504    ) -> Result<SetRegionRoleStateResponse> {
505        // Notes: It acquires the mutable ownership to ensure no other threads,
506        // Therefore, we submit it to the worker.
507        let (request, receiver) =
508            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
509        self.workers.submit_to_worker(region_id, request).await?;
510
511        receiver.await.context(RecvSnafu)
512    }
513
514    async fn sync_region(
515        &self,
516        region_id: RegionId,
517        manifest_info: RegionManifestInfo,
518    ) -> Result<(ManifestVersion, bool)> {
519        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
520        let manifest_version = manifest_info.data_manifest_version();
521        let (request, receiver) =
522            WorkerRequest::new_sync_region_request(region_id, manifest_version);
523        self.workers.submit_to_worker(region_id, request).await?;
524
525        receiver.await.context(RecvSnafu)?
526    }
527
528    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
529        self.workers.get_region(region_id).map(|region| {
530            if region.is_follower() {
531                RegionRole::Follower
532            } else {
533                RegionRole::Leader
534            }
535        })
536    }
537}
538
539#[async_trait]
540impl RegionEngine for MitoEngine {
541    fn name(&self) -> &str {
542        MITO_ENGINE_NAME
543    }
544
545    #[tracing::instrument(skip_all)]
546    async fn handle_batch_open_requests(
547        &self,
548        parallelism: usize,
549        requests: Vec<(RegionId, RegionOpenRequest)>,
550    ) -> Result<BatchResponses, BoxedError> {
551        // TODO(weny): add metrics.
552        self.inner
553            .handle_batch_open_requests(parallelism, requests)
554            .await
555            .map(|responses| {
556                responses
557                    .into_iter()
558                    .map(|(region_id, response)| {
559                        (
560                            region_id,
561                            response.map(RegionResponse::new).map_err(BoxedError::new),
562                        )
563                    })
564                    .collect::<Vec<_>>()
565            })
566            .map_err(BoxedError::new)
567    }
568
569    #[tracing::instrument(skip_all)]
570    async fn handle_request(
571        &self,
572        region_id: RegionId,
573        request: RegionRequest,
574    ) -> Result<RegionResponse, BoxedError> {
575        let _timer = HANDLE_REQUEST_ELAPSED
576            .with_label_values(&[request.request_type()])
577            .start_timer();
578
579        let is_alter = matches!(request, RegionRequest::Alter(_));
580        let mut response = self
581            .inner
582            .handle_request(region_id, request)
583            .await
584            .map(RegionResponse::new)
585            .map_err(BoxedError::new)?;
586
587        if is_alter {
588            if let Some(statistic) = self.region_statistic(region_id) {
589                Self::encode_manifest_info_to_extensions(
590                    &region_id,
591                    statistic.manifest,
592                    &mut response.extensions,
593                )
594                .map_err(BoxedError::new)?;
595            }
596        }
597
598        Ok(response)
599    }
600
601    #[tracing::instrument(skip_all)]
602    async fn handle_query(
603        &self,
604        region_id: RegionId,
605        request: ScanRequest,
606    ) -> Result<RegionScannerRef, BoxedError> {
607        self.scan_region(region_id, request)
608            .map_err(BoxedError::new)?
609            .region_scanner()
610            .map_err(BoxedError::new)
611    }
612
613    async fn get_last_seq_num(
614        &self,
615        region_id: RegionId,
616    ) -> Result<Option<SequenceNumber>, BoxedError> {
617        self.inner
618            .get_last_seq_num(region_id)
619            .map_err(BoxedError::new)
620    }
621
622    /// Retrieve region's metadata.
623    async fn get_metadata(
624        &self,
625        region_id: RegionId,
626    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
627        self.inner.get_metadata(region_id).map_err(BoxedError::new)
628    }
629
630    /// Stop the engine.
631    ///
632    /// Stopping the engine doesn't stop the underlying log store as other components might
633    /// still use it. (When no other components are referencing the log store, it will
634    /// automatically shutdown.)
635    async fn stop(&self) -> std::result::Result<(), BoxedError> {
636        self.inner.stop().await.map_err(BoxedError::new)
637    }
638
639    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
640        self.get_region_statistic(region_id)
641    }
642
643    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
644        self.inner
645            .set_region_role(region_id, role)
646            .map_err(BoxedError::new)
647    }
648
649    async fn set_region_role_state_gracefully(
650        &self,
651        region_id: RegionId,
652        region_role_state: SettableRegionRoleState,
653    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
654        let _timer = HANDLE_REQUEST_ELAPSED
655            .with_label_values(&["set_region_role_state_gracefully"])
656            .start_timer();
657
658        self.inner
659            .set_region_role_state_gracefully(region_id, region_role_state)
660            .await
661            .map_err(BoxedError::new)
662    }
663
664    async fn sync_region(
665        &self,
666        region_id: RegionId,
667        manifest_info: RegionManifestInfo,
668    ) -> Result<SyncManifestResponse, BoxedError> {
669        let (_, synced) = self
670            .inner
671            .sync_region(region_id, manifest_info)
672            .await
673            .map_err(BoxedError::new)?;
674
675        Ok(SyncManifestResponse::Mito { synced })
676    }
677
678    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
679        self.inner.role(region_id)
680    }
681
682    fn as_any(&self) -> &dyn Any {
683        self
684    }
685}
686
687// Tests methods.
688#[cfg(any(test, feature = "test"))]
689#[allow(clippy::too_many_arguments)]
690impl MitoEngine {
691    /// Returns a new [MitoEngine] for tests.
692    pub async fn new_for_test<S: LogStore>(
693        data_home: &str,
694        mut config: MitoConfig,
695        log_store: Arc<S>,
696        object_store_manager: ObjectStoreManagerRef,
697        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
698        listener: Option<crate::engine::listener::EventListenerRef>,
699        time_provider: crate::time_provider::TimeProviderRef,
700        schema_metadata_manager: SchemaMetadataManagerRef,
701    ) -> Result<MitoEngine> {
702        config.sanitize(data_home)?;
703
704        let config = Arc::new(config);
705        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
706        Ok(MitoEngine {
707            inner: Arc::new(EngineInner {
708                workers: WorkerGroup::start_for_test(
709                    config.clone(),
710                    log_store,
711                    object_store_manager,
712                    write_buffer_manager,
713                    listener,
714                    schema_metadata_manager,
715                    time_provider,
716                )
717                .await?,
718                config,
719                wal_raw_entry_reader,
720            }),
721        })
722    }
723
724    /// Returns the purge scheduler.
725    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
726        self.inner.workers.purge_scheduler()
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use std::time::Duration;
733
734    use super::*;
735    use crate::sst::file::FileMeta;
736
737    #[test]
738    fn test_is_valid_region_edit() {
739        // Valid: has only "files_to_add"
740        let edit = RegionEdit {
741            files_to_add: vec![FileMeta::default()],
742            files_to_remove: vec![],
743            compaction_time_window: None,
744            flushed_entry_id: None,
745            flushed_sequence: None,
746        };
747        assert!(is_valid_region_edit(&edit));
748
749        // Invalid: "files_to_add" is empty
750        let edit = RegionEdit {
751            files_to_add: vec![],
752            files_to_remove: vec![],
753            compaction_time_window: None,
754            flushed_entry_id: None,
755            flushed_sequence: None,
756        };
757        assert!(!is_valid_region_edit(&edit));
758
759        // Invalid: "files_to_remove" is not empty
760        let edit = RegionEdit {
761            files_to_add: vec![FileMeta::default()],
762            files_to_remove: vec![FileMeta::default()],
763            compaction_time_window: None,
764            flushed_entry_id: None,
765            flushed_sequence: None,
766        };
767        assert!(!is_valid_region_edit(&edit));
768
769        // Invalid: other fields are not all "None"s
770        let edit = RegionEdit {
771            files_to_add: vec![FileMeta::default()],
772            files_to_remove: vec![],
773            compaction_time_window: Some(Duration::from_secs(1)),
774            flushed_entry_id: None,
775            flushed_sequence: None,
776        };
777        assert!(!is_valid_region_edit(&edit));
778        let edit = RegionEdit {
779            files_to_add: vec![FileMeta::default()],
780            files_to_remove: vec![],
781            compaction_time_window: None,
782            flushed_entry_id: Some(1),
783            flushed_sequence: None,
784        };
785        assert!(!is_valid_region_edit(&edit));
786        let edit = RegionEdit {
787            files_to_add: vec![FileMeta::default()],
788            files_to_remove: vec![],
789            compaction_time_window: None,
790            flushed_entry_id: None,
791            flushed_sequence: Some(1),
792        };
793        assert!(!is_valid_region_edit(&edit));
794    }
795}