mito2/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region engine.
16
17#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod staging_test;
69#[cfg(test)]
70mod sync_test;
71#[cfg(test)]
72mod truncate_test;
73
74mod puffin_index;
75
76use std::any::Any;
77use std::collections::HashMap;
78use std::sync::Arc;
79use std::time::Instant;
80
81use api::region::RegionResponse;
82use async_trait::async_trait;
83use common_base::Plugins;
84use common_error::ext::BoxedError;
85use common_meta::key::SchemaMetadataManagerRef;
86use common_recordbatch::SendableRecordBatchStream;
87use common_telemetry::{info, tracing, warn};
88use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
89use futures::future::{join_all, try_join_all};
90use futures::stream::{self, Stream, StreamExt};
91use object_store::manager::ObjectStoreManagerRef;
92use snafu::{OptionExt, ResultExt, ensure};
93use store_api::ManifestVersion;
94use store_api::codec::PrimaryKeyEncoding;
95use store_api::logstore::LogStore;
96use store_api::logstore::provider::{KafkaProvider, Provider};
97use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
98use store_api::metric_engine_consts::{
99    MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
100};
101use store_api::region_engine::{
102    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
103    RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
104};
105use store_api::region_request::{
106    AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
107};
108use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
109use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
110use tokio::sync::{Semaphore, oneshot};
111
112use crate::access_layer::RegionFilePathFactory;
113use crate::cache::{CacheManagerRef, CacheStrategy};
114use crate::config::MitoConfig;
115use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
116use crate::error::{
117    InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
118    SerdeJsonSnafu, SerializeColumnMetadataSnafu,
119};
120#[cfg(feature = "enterprise")]
121use crate::extension::BoxedExtensionRangeProviderFactory;
122use crate::gc::GcLimiterRef;
123use crate::manifest::action::RegionEdit;
124use crate::memtable::MemtableStats;
125use crate::metrics::HANDLE_REQUEST_ELAPSED;
126use crate::read::scan_region::{ScanRegion, Scanner};
127use crate::read::stream::ScanBatchStream;
128use crate::region::MitoRegionRef;
129use crate::region::opener::PartitionExprFetcherRef;
130use crate::request::{RegionEditRequest, WorkerRequest};
131use crate::sst::file::{FileMeta, RegionFileId};
132use crate::sst::file_ref::FileReferenceManagerRef;
133use crate::wal::entry_distributor::{
134    DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
135};
136use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
137use crate::worker::WorkerGroup;
138
139pub const MITO_ENGINE_NAME: &str = "mito";
140
141pub struct MitoEngineBuilder<'a, S: LogStore> {
142    data_home: &'a str,
143    config: MitoConfig,
144    log_store: Arc<S>,
145    object_store_manager: ObjectStoreManagerRef,
146    schema_metadata_manager: SchemaMetadataManagerRef,
147    file_ref_manager: FileReferenceManagerRef,
148    partition_expr_fetcher: PartitionExprFetcherRef,
149    plugins: Plugins,
150    #[cfg(feature = "enterprise")]
151    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
152}
153
154impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
155    #[allow(clippy::too_many_arguments)]
156    pub fn new(
157        data_home: &'a str,
158        config: MitoConfig,
159        log_store: Arc<S>,
160        object_store_manager: ObjectStoreManagerRef,
161        schema_metadata_manager: SchemaMetadataManagerRef,
162        file_ref_manager: FileReferenceManagerRef,
163        partition_expr_fetcher: PartitionExprFetcherRef,
164        plugins: Plugins,
165    ) -> Self {
166        Self {
167            data_home,
168            config,
169            log_store,
170            object_store_manager,
171            schema_metadata_manager,
172            file_ref_manager,
173            plugins,
174            partition_expr_fetcher,
175            #[cfg(feature = "enterprise")]
176            extension_range_provider_factory: None,
177        }
178    }
179
180    #[cfg(feature = "enterprise")]
181    #[must_use]
182    pub fn with_extension_range_provider_factory(
183        self,
184        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
185    ) -> Self {
186        Self {
187            extension_range_provider_factory,
188            ..self
189        }
190    }
191
192    pub async fn try_build(mut self) -> Result<MitoEngine> {
193        self.config.sanitize(self.data_home)?;
194
195        let config = Arc::new(self.config);
196        let workers = WorkerGroup::start(
197            config.clone(),
198            self.log_store.clone(),
199            self.object_store_manager,
200            self.schema_metadata_manager,
201            self.file_ref_manager,
202            self.partition_expr_fetcher.clone(),
203            self.plugins,
204        )
205        .await?;
206        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
207        let inner = EngineInner {
208            workers,
209            config,
210            wal_raw_entry_reader,
211            #[cfg(feature = "enterprise")]
212            extension_range_provider_factory: None,
213        };
214
215        #[cfg(feature = "enterprise")]
216        let inner =
217            inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
218
219        Ok(MitoEngine {
220            inner: Arc::new(inner),
221        })
222    }
223}
224
225/// Region engine implementation for timeseries data.
226#[derive(Clone)]
227pub struct MitoEngine {
228    inner: Arc<EngineInner>,
229}
230
231impl MitoEngine {
232    /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
233    #[allow(clippy::too_many_arguments)]
234    pub async fn new<S: LogStore>(
235        data_home: &str,
236        config: MitoConfig,
237        log_store: Arc<S>,
238        object_store_manager: ObjectStoreManagerRef,
239        schema_metadata_manager: SchemaMetadataManagerRef,
240        file_ref_manager: FileReferenceManagerRef,
241        partition_expr_fetcher: PartitionExprFetcherRef,
242        plugins: Plugins,
243    ) -> Result<MitoEngine> {
244        let builder = MitoEngineBuilder::new(
245            data_home,
246            config,
247            log_store,
248            object_store_manager,
249            schema_metadata_manager,
250            file_ref_manager,
251            partition_expr_fetcher,
252            plugins,
253        );
254        builder.try_build().await
255    }
256
257    pub fn mito_config(&self) -> &MitoConfig {
258        &self.inner.config
259    }
260
261    pub fn cache_manager(&self) -> CacheManagerRef {
262        self.inner.workers.cache_manager()
263    }
264
265    pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
266        self.inner.workers.file_ref_manager()
267    }
268
269    pub fn gc_limiter(&self) -> GcLimiterRef {
270        self.inner.workers.gc_limiter()
271    }
272
273    /// Get all tmp ref files for given region ids, excluding files that's already in manifest.
274    pub async fn get_snapshot_of_unmanifested_refs(
275        &self,
276        region_ids: impl IntoIterator<Item = RegionId>,
277    ) -> Result<FileRefsManifest> {
278        let file_ref_mgr = self.file_ref_manager();
279
280        let region_ids = region_ids.into_iter().collect::<Vec<_>>();
281
282        // Convert region IDs to MitoRegionRef objects, error if any region doesn't exist
283        let regions: Vec<MitoRegionRef> = region_ids
284            .into_iter()
285            .map(|region_id| {
286                self.find_region(region_id)
287                    .with_context(|| RegionNotFoundSnafu { region_id })
288            })
289            .collect::<Result<_>>()?;
290
291        file_ref_mgr
292            .get_snapshot_of_unmanifested_refs(regions)
293            .await
294    }
295
296    /// Returns true if the specific region exists.
297    pub fn is_region_exists(&self, region_id: RegionId) -> bool {
298        self.inner.workers.is_region_exists(region_id)
299    }
300
301    /// Returns true if the specific region exists.
302    pub fn is_region_opening(&self, region_id: RegionId) -> bool {
303        self.inner.workers.is_region_opening(region_id)
304    }
305
306    /// Returns true if the specific region is catching up.
307    pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
308        self.inner.workers.is_region_catching_up(region_id)
309    }
310
311    /// Returns the region disk/memory statistic.
312    pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
313        self.find_region(region_id)
314            .map(|region| region.region_statistic())
315    }
316
317    /// Returns primary key encoding of the region.
318    pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
319        self.find_region(region_id)
320            .map(|r| r.primary_key_encoding())
321    }
322
323    /// Handle substrait query and return a stream of record batches
324    ///
325    /// Notice that the output stream's ordering is not guranateed. If order
326    /// matter, please use [`scanner`] to build a [`Scanner`] to consume.
327    #[tracing::instrument(skip_all)]
328    pub async fn scan_to_stream(
329        &self,
330        region_id: RegionId,
331        request: ScanRequest,
332    ) -> Result<SendableRecordBatchStream, BoxedError> {
333        self.scanner(region_id, request)
334            .await
335            .map_err(BoxedError::new)?
336            .scan()
337            .await
338    }
339
340    /// Scan [`Batch`]es by [`ScanRequest`].
341    pub async fn scan_batch(
342        &self,
343        region_id: RegionId,
344        request: ScanRequest,
345        filter_deleted: bool,
346    ) -> Result<ScanBatchStream> {
347        let mut scan_region = self.scan_region(region_id, request)?;
348        scan_region.set_filter_deleted(filter_deleted);
349        scan_region.scanner().await?.scan_batch()
350    }
351
352    /// Returns a scanner to scan for `request`.
353    async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
354        self.scan_region(region_id, request)?.scanner().await
355    }
356
357    /// Scans a region.
358    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
359        self.inner.scan_region(region_id, request)
360    }
361
362    /// Edit region's metadata by [RegionEdit] directly. Use with care.
363    /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
364    /// Other region editing intention will result in an "invalid request" error.
365    /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
366    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
367        let _timer = HANDLE_REQUEST_ELAPSED
368            .with_label_values(&["edit_region"])
369            .start_timer();
370
371        ensure!(
372            is_valid_region_edit(&edit),
373            InvalidRequestSnafu {
374                region_id,
375                reason: "invalid region edit"
376            }
377        );
378
379        let (tx, rx) = oneshot::channel();
380        let request = WorkerRequest::EditRegion(RegionEditRequest {
381            region_id,
382            edit,
383            tx,
384        });
385        self.inner
386            .workers
387            .submit_to_worker(region_id, request)
388            .await?;
389        rx.await.context(RecvSnafu)?
390    }
391
392    #[cfg(test)]
393    pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
394        self.find_region(id)
395    }
396
397    pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
398        self.inner.workers.get_region(region_id)
399    }
400
401    fn encode_manifest_info_to_extensions(
402        region_id: &RegionId,
403        manifest_info: RegionManifestInfo,
404        extensions: &mut HashMap<String, Vec<u8>>,
405    ) -> Result<()> {
406        let region_manifest_info = vec![(*region_id, manifest_info)];
407
408        extensions.insert(
409            MANIFEST_INFO_EXTENSION_KEY.to_string(),
410            RegionManifestInfo::encode_list(&region_manifest_info).context(SerdeJsonSnafu)?,
411        );
412        info!(
413            "Added manifest info: {:?} to extensions, region_id: {:?}",
414            region_manifest_info, region_id
415        );
416        Ok(())
417    }
418
419    fn encode_column_metadatas_to_extensions(
420        region_id: &RegionId,
421        column_metadatas: Vec<ColumnMetadata>,
422        extensions: &mut HashMap<String, Vec<u8>>,
423    ) -> Result<()> {
424        extensions.insert(
425            TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
426            ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
427        );
428        info!(
429            "Added column metadatas: {:?} to extensions, region_id: {:?}",
430            column_metadatas, region_id
431        );
432        Ok(())
433    }
434
435    /// Find the current version's memtables and SSTs stats by region_id.
436    /// The stats must be collected in one place one time to ensure data consistency.
437    pub fn find_memtable_and_sst_stats(
438        &self,
439        region_id: RegionId,
440    ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
441        let region = self
442            .find_region(region_id)
443            .context(RegionNotFoundSnafu { region_id })?;
444
445        let version = region.version();
446        let memtable_stats = version
447            .memtables
448            .list_memtables()
449            .iter()
450            .map(|x| x.stats())
451            .collect::<Vec<_>>();
452
453        let sst_stats = version
454            .ssts
455            .levels()
456            .iter()
457            .flat_map(|level| level.files().map(|x| x.meta_ref()))
458            .cloned()
459            .collect::<Vec<_>>();
460        Ok((memtable_stats, sst_stats))
461    }
462
463    /// Lists all SSTs from the manifest of all regions in the engine.
464    pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
465        let node_id = self.inner.workers.file_ref_manager().node_id();
466        let regions = self.inner.workers.all_regions();
467
468        let mut results = Vec::new();
469        for region in regions {
470            let mut entries = region.manifest_sst_entries().await;
471            for e in &mut entries {
472                e.node_id = node_id;
473            }
474            results.extend(entries);
475        }
476
477        results
478    }
479
480    /// Lists metadata about all puffin index targets stored in the engine.
481    pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
482        let node_id = self.inner.workers.file_ref_manager().node_id();
483        let cache_manager = self.inner.workers.cache_manager();
484        let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
485        let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
486        let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
487
488        let mut results = Vec::new();
489
490        for region in self.inner.workers.all_regions() {
491            let manifest_entries = region.manifest_sst_entries().await;
492            let access_layer = region.access_layer.clone();
493            let table_dir = access_layer.table_dir().to_string();
494            let path_type = access_layer.path_type();
495            let object_store = access_layer.object_store().clone();
496            let puffin_factory = access_layer.puffin_manager_factory().clone();
497            let path_factory = RegionFilePathFactory::new(table_dir, path_type);
498
499            let entry_futures = manifest_entries.into_iter().map(|entry| {
500                let object_store = object_store.clone();
501                let path_factory = path_factory.clone();
502                let puffin_factory = puffin_factory.clone();
503                let puffin_metadata_cache = puffin_metadata_cache.clone();
504                let bloom_filter_cache = bloom_filter_cache.clone();
505                let inverted_index_cache = inverted_index_cache.clone();
506
507                async move {
508                    let Some(index_file_path) = entry.index_file_path.as_ref() else {
509                        return Vec::new();
510                    };
511
512                    let Some(index_file_id) = entry.index_file_id.as_ref() else {
513                        return Vec::new();
514                    };
515                    let file_id = match FileId::parse_str(index_file_id) {
516                        Ok(file_id) => file_id,
517                        Err(err) => {
518                            warn!(
519                                err;
520                                "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
521                                entry.table_dir,
522                                index_file_id
523                            );
524                            return Vec::new();
525                        }
526                    };
527                    let region_file_id = RegionFileId::new(entry.region_id, file_id);
528                    let context = IndexEntryContext {
529                        table_dir: &entry.table_dir,
530                        index_file_path: index_file_path.as_str(),
531                        region_id: entry.region_id,
532                        table_id: entry.table_id,
533                        region_number: entry.region_number,
534                        region_group: entry.region_group,
535                        region_sequence: entry.region_sequence,
536                        file_id: index_file_id,
537                        index_file_size: entry.index_file_size,
538                        node_id,
539                    };
540
541                    let manager = puffin_factory
542                        .build(object_store, path_factory)
543                        .with_puffin_metadata_cache(puffin_metadata_cache);
544
545                    collect_index_entries_from_puffin(
546                        manager,
547                        region_file_id,
548                        context,
549                        bloom_filter_cache,
550                        inverted_index_cache,
551                    )
552                    .await
553                }
554            });
555
556            let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); // Parallelism is 8.
557            while let Some(mut metas) = meta_stream.next().await {
558                results.append(&mut metas);
559            }
560        }
561
562        results
563    }
564
565    /// Lists all SSTs from the storage layer of all regions in the engine.
566    pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
567        let node_id = self.inner.workers.file_ref_manager().node_id();
568        let regions = self.inner.workers.all_regions();
569
570        let mut layers_distinct_table_dirs = HashMap::new();
571        for region in regions {
572            let table_dir = region.access_layer.table_dir();
573            if !layers_distinct_table_dirs.contains_key(table_dir) {
574                layers_distinct_table_dirs
575                    .insert(table_dir.to_string(), region.access_layer.clone());
576            }
577        }
578
579        stream::iter(layers_distinct_table_dirs)
580            .map(|(_, access_layer)| access_layer.storage_sst_entries())
581            .flatten()
582            .map(move |entry| {
583                entry.map(move |mut entry| {
584                    entry.node_id = node_id;
585                    entry
586                })
587            })
588    }
589}
590
591/// Check whether the region edit is valid. Only adding files to region is considered valid now.
592fn is_valid_region_edit(edit: &RegionEdit) -> bool {
593    !edit.files_to_add.is_empty()
594        && edit.files_to_remove.is_empty()
595        && matches!(
596            edit,
597            RegionEdit {
598                files_to_add: _,
599                files_to_remove: _,
600                timestamp_ms: _,
601                compaction_time_window: None,
602                flushed_entry_id: None,
603                flushed_sequence: None,
604                ..
605            }
606        )
607}
608
609/// Inner struct of [MitoEngine].
610struct EngineInner {
611    /// Region workers group.
612    workers: WorkerGroup,
613    /// Config of the engine.
614    config: Arc<MitoConfig>,
615    /// The Wal raw entry reader.
616    wal_raw_entry_reader: Arc<dyn RawEntryReader>,
617    #[cfg(feature = "enterprise")]
618    extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
619}
620
621type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
622
623/// Returns requests([TopicGroupedRegionOpenRequests]) grouped by topic and remaining requests.
624fn prepare_batch_open_requests(
625    requests: Vec<(RegionId, RegionOpenRequest)>,
626) -> Result<(
627    TopicGroupedRegionOpenRequests,
628    Vec<(RegionId, RegionOpenRequest)>,
629)> {
630    let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
631    let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
632    for (region_id, request) in requests {
633        let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
634            serde_json::from_str(options).context(SerdeJsonSnafu)?
635        } else {
636            WalOptions::RaftEngine
637        };
638        match options {
639            WalOptions::Kafka(options) => {
640                topic_to_regions
641                    .entry(options.topic)
642                    .or_default()
643                    .push((region_id, request));
644            }
645            WalOptions::RaftEngine | WalOptions::Noop => {
646                remaining_regions.push((region_id, request));
647            }
648        }
649    }
650
651    Ok((topic_to_regions, remaining_regions))
652}
653
654impl EngineInner {
655    #[cfg(feature = "enterprise")]
656    #[must_use]
657    fn with_extension_range_provider_factory(
658        self,
659        extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
660    ) -> Self {
661        Self {
662            extension_range_provider_factory,
663            ..self
664        }
665    }
666
667    /// Stop the inner engine.
668    async fn stop(&self) -> Result<()> {
669        self.workers.stop().await
670    }
671
672    fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
673        self.workers
674            .get_region(region_id)
675            .context(RegionNotFoundSnafu { region_id })
676    }
677
678    /// Get metadata of a region.
679    ///
680    /// Returns error if the region doesn't exist.
681    fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
682        // Reading a region doesn't need to go through the region worker thread.
683        let region = self.find_region(region_id)?;
684        Ok(region.metadata())
685    }
686
687    async fn open_topic_regions(
688        &self,
689        topic: String,
690        region_requests: Vec<(RegionId, RegionOpenRequest)>,
691    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
692        let now = Instant::now();
693        let region_ids = region_requests
694            .iter()
695            .map(|(region_id, _)| *region_id)
696            .collect::<Vec<_>>();
697        let provider = Provider::kafka_provider(topic);
698        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
699            provider.clone(),
700            self.wal_raw_entry_reader.clone(),
701            &region_ids,
702            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
703        );
704
705        let mut responses = Vec::with_capacity(region_requests.len());
706        for ((region_id, request), entry_receiver) in
707            region_requests.into_iter().zip(entry_receivers)
708        {
709            let (request, receiver) =
710                WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
711            self.workers.submit_to_worker(region_id, request).await?;
712            responses.push(async move { receiver.await.context(RecvSnafu)? });
713        }
714
715        // Waits for entries distribution.
716        let distribution =
717            common_runtime::spawn_global(async move { distributor.distribute().await });
718        // Waits for worker returns.
719        let responses = join_all(responses).await;
720        distribution.await.context(JoinSnafu)??;
721
722        let num_failure = responses.iter().filter(|r| r.is_err()).count();
723        info!(
724            "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
725            region_ids.len() - num_failure,
726            // Safety: provider is kafka provider.
727            provider.as_kafka_provider().unwrap(),
728            num_failure,
729            now.elapsed(),
730        );
731        Ok(region_ids.into_iter().zip(responses).collect())
732    }
733
734    async fn handle_batch_open_requests(
735        &self,
736        parallelism: usize,
737        requests: Vec<(RegionId, RegionOpenRequest)>,
738    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
739        let semaphore = Arc::new(Semaphore::new(parallelism));
740        let (topic_to_region_requests, remaining_region_requests) =
741            prepare_batch_open_requests(requests)?;
742        let mut responses =
743            Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
744
745        if !topic_to_region_requests.is_empty() {
746            let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
747            for (topic, region_requests) in topic_to_region_requests {
748                let semaphore_moved = semaphore.clone();
749                tasks.push(async move {
750                    // Safety: semaphore must exist
751                    let _permit = semaphore_moved.acquire().await.unwrap();
752                    self.open_topic_regions(topic, region_requests).await
753                })
754            }
755            let r = try_join_all(tasks).await?;
756            responses.extend(r.into_iter().flatten());
757        }
758
759        if !remaining_region_requests.is_empty() {
760            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
761            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
762            for (region_id, request) in remaining_region_requests {
763                let semaphore_moved = semaphore.clone();
764                region_ids.push(region_id);
765                tasks.push(async move {
766                    // Safety: semaphore must exist
767                    let _permit = semaphore_moved.acquire().await.unwrap();
768                    let (request, receiver) =
769                        WorkerRequest::new_open_region_request(region_id, request, None);
770
771                    self.workers.submit_to_worker(region_id, request).await?;
772
773                    receiver.await.context(RecvSnafu)?
774                })
775            }
776
777            let results = join_all(tasks).await;
778            responses.extend(region_ids.into_iter().zip(results));
779        }
780
781        Ok(responses)
782    }
783
784    async fn catchup_topic_regions(
785        &self,
786        provider: Provider,
787        region_requests: Vec<(RegionId, RegionCatchupRequest)>,
788    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
789        let now = Instant::now();
790        let region_ids = region_requests
791            .iter()
792            .map(|(region_id, _)| *region_id)
793            .collect::<Vec<_>>();
794        let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
795            provider.clone(),
796            self.wal_raw_entry_reader.clone(),
797            &region_ids,
798            DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
799        );
800
801        let mut responses = Vec::with_capacity(region_requests.len());
802        for ((region_id, request), entry_receiver) in
803            region_requests.into_iter().zip(entry_receivers)
804        {
805            let (request, receiver) =
806                WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
807            self.workers.submit_to_worker(region_id, request).await?;
808            responses.push(async move { receiver.await.context(RecvSnafu)? });
809        }
810
811        // Wait for entries distribution.
812        let distribution =
813            common_runtime::spawn_global(async move { distributor.distribute().await });
814        // Wait for worker returns.
815        let responses = join_all(responses).await;
816        distribution.await.context(JoinSnafu)??;
817
818        let num_failure = responses.iter().filter(|r| r.is_err()).count();
819        info!(
820            "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
821            region_ids.len() - num_failure,
822            // Safety: provider is kafka provider.
823            provider.as_kafka_provider().unwrap(),
824            num_failure,
825            now.elapsed(),
826        );
827
828        Ok(region_ids.into_iter().zip(responses).collect())
829    }
830
831    async fn handle_batch_catchup_requests(
832        &self,
833        parallelism: usize,
834        requests: Vec<(RegionId, RegionCatchupRequest)>,
835    ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
836        let mut responses = Vec::with_capacity(requests.len());
837        let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
838        let mut remaining_region_requests = vec![];
839
840        for (region_id, request) in requests {
841            match self.workers.get_region(region_id) {
842                Some(region) => match region.provider.as_kafka_provider() {
843                    Some(provider) => {
844                        topic_regions
845                            .entry(provider.clone())
846                            .or_default()
847                            .push((region_id, request));
848                    }
849                    None => {
850                        remaining_region_requests.push((region_id, request));
851                    }
852                },
853                None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
854            }
855        }
856
857        let semaphore = Arc::new(Semaphore::new(parallelism));
858
859        if !topic_regions.is_empty() {
860            let mut tasks = Vec::with_capacity(topic_regions.len());
861            for (provider, region_requests) in topic_regions {
862                let semaphore_moved = semaphore.clone();
863                tasks.push(async move {
864                    // Safety: semaphore must exist
865                    let _permit = semaphore_moved.acquire().await.unwrap();
866                    self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
867                        .await
868                })
869            }
870
871            let r = try_join_all(tasks).await?;
872            responses.extend(r.into_iter().flatten());
873        }
874
875        if !remaining_region_requests.is_empty() {
876            let mut tasks = Vec::with_capacity(remaining_region_requests.len());
877            let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
878            for (region_id, request) in remaining_region_requests {
879                let semaphore_moved = semaphore.clone();
880                region_ids.push(region_id);
881                tasks.push(async move {
882                    // Safety: semaphore must exist
883                    let _permit = semaphore_moved.acquire().await.unwrap();
884                    let (request, receiver) =
885                        WorkerRequest::new_catchup_region_request(region_id, request, None);
886
887                    self.workers.submit_to_worker(region_id, request).await?;
888
889                    receiver.await.context(RecvSnafu)?
890                })
891            }
892
893            let results = join_all(tasks).await;
894            responses.extend(region_ids.into_iter().zip(results));
895        }
896
897        Ok(responses)
898    }
899
900    /// Handles [RegionRequest] and return its executed result.
901    async fn handle_request(
902        &self,
903        region_id: RegionId,
904        request: RegionRequest,
905    ) -> Result<AffectedRows> {
906        let region_metadata = self.get_metadata(region_id).ok();
907        let (request, receiver) =
908            WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
909        self.workers.submit_to_worker(region_id, request).await?;
910
911        receiver.await.context(RecvSnafu)?
912    }
913
914    /// Returns the sequence of latest committed data.
915    fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
916        // Reading a region doesn't need to go through the region worker thread.
917        self.find_region(region_id)
918            .map(|r| r.find_committed_sequence())
919    }
920
921    /// Handles the scan `request` and returns a [ScanRegion].
922    fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
923        let query_start = Instant::now();
924        // Reading a region doesn't need to go through the region worker thread.
925        let region = self.find_region(region_id)?;
926        let version = region.version();
927        // Get cache.
928        let cache_manager = self.workers.cache_manager();
929
930        let scan_region = ScanRegion::new(
931            version,
932            region.access_layer.clone(),
933            request,
934            CacheStrategy::EnableAll(cache_manager),
935        )
936        .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
937        .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
938        .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
939        .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
940        .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
941        .with_start_time(query_start)
942        .with_flat_format(self.config.default_experimental_flat_format);
943
944        #[cfg(feature = "enterprise")]
945        let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
946
947        Ok(scan_region)
948    }
949
950    #[cfg(feature = "enterprise")]
951    fn maybe_fill_extension_range_provider(
952        &self,
953        mut scan_region: ScanRegion,
954        region: MitoRegionRef,
955    ) -> ScanRegion {
956        if region.is_follower()
957            && let Some(factory) = self.extension_range_provider_factory.as_ref()
958        {
959            scan_region
960                .set_extension_range_provider(factory.create_extension_range_provider(region));
961        }
962        scan_region
963    }
964
965    /// Converts the [`RegionRole`].
966    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
967        let region = self.find_region(region_id)?;
968        region.set_role(role);
969        Ok(())
970    }
971
972    /// Sets read-only for a region and ensures no more writes in the region after it returns.
973    async fn set_region_role_state_gracefully(
974        &self,
975        region_id: RegionId,
976        region_role_state: SettableRegionRoleState,
977    ) -> Result<SetRegionRoleStateResponse> {
978        // Notes: It acquires the mutable ownership to ensure no other threads,
979        // Therefore, we submit it to the worker.
980        let (request, receiver) =
981            WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
982        self.workers.submit_to_worker(region_id, request).await?;
983
984        receiver.await.context(RecvSnafu)
985    }
986
987    async fn sync_region(
988        &self,
989        region_id: RegionId,
990        manifest_info: RegionManifestInfo,
991    ) -> Result<(ManifestVersion, bool)> {
992        ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
993        let manifest_version = manifest_info.data_manifest_version();
994        let (request, receiver) =
995            WorkerRequest::new_sync_region_request(region_id, manifest_version);
996        self.workers.submit_to_worker(region_id, request).await?;
997
998        receiver.await.context(RecvSnafu)?
999    }
1000
1001    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1002        self.workers.get_region(region_id).map(|region| {
1003            if region.is_follower() {
1004                RegionRole::Follower
1005            } else {
1006                RegionRole::Leader
1007            }
1008        })
1009    }
1010}
1011
1012#[async_trait]
1013impl RegionEngine for MitoEngine {
1014    fn name(&self) -> &str {
1015        MITO_ENGINE_NAME
1016    }
1017
1018    #[tracing::instrument(skip_all)]
1019    async fn handle_batch_open_requests(
1020        &self,
1021        parallelism: usize,
1022        requests: Vec<(RegionId, RegionOpenRequest)>,
1023    ) -> Result<BatchResponses, BoxedError> {
1024        // TODO(weny): add metrics.
1025        self.inner
1026            .handle_batch_open_requests(parallelism, requests)
1027            .await
1028            .map(|responses| {
1029                responses
1030                    .into_iter()
1031                    .map(|(region_id, response)| {
1032                        (
1033                            region_id,
1034                            response.map(RegionResponse::new).map_err(BoxedError::new),
1035                        )
1036                    })
1037                    .collect::<Vec<_>>()
1038            })
1039            .map_err(BoxedError::new)
1040    }
1041
1042    #[tracing::instrument(skip_all)]
1043    async fn handle_batch_catchup_requests(
1044        &self,
1045        parallelism: usize,
1046        requests: Vec<(RegionId, RegionCatchupRequest)>,
1047    ) -> Result<BatchResponses, BoxedError> {
1048        self.inner
1049            .handle_batch_catchup_requests(parallelism, requests)
1050            .await
1051            .map(|responses| {
1052                responses
1053                    .into_iter()
1054                    .map(|(region_id, response)| {
1055                        (
1056                            region_id,
1057                            response.map(RegionResponse::new).map_err(BoxedError::new),
1058                        )
1059                    })
1060                    .collect::<Vec<_>>()
1061            })
1062            .map_err(BoxedError::new)
1063    }
1064
1065    #[tracing::instrument(skip_all)]
1066    async fn handle_request(
1067        &self,
1068        region_id: RegionId,
1069        request: RegionRequest,
1070    ) -> Result<RegionResponse, BoxedError> {
1071        let _timer = HANDLE_REQUEST_ELAPSED
1072            .with_label_values(&[request.request_type()])
1073            .start_timer();
1074
1075        let is_alter = matches!(request, RegionRequest::Alter(_));
1076        let is_create = matches!(request, RegionRequest::Create(_));
1077        let mut response = self
1078            .inner
1079            .handle_request(region_id, request)
1080            .await
1081            .map(RegionResponse::new)
1082            .map_err(BoxedError::new)?;
1083
1084        if is_alter {
1085            self.handle_alter_response(region_id, &mut response)
1086                .map_err(BoxedError::new)?;
1087        } else if is_create {
1088            self.handle_create_response(region_id, &mut response)
1089                .map_err(BoxedError::new)?;
1090        }
1091
1092        Ok(response)
1093    }
1094
1095    #[tracing::instrument(skip_all)]
1096    async fn handle_query(
1097        &self,
1098        region_id: RegionId,
1099        request: ScanRequest,
1100    ) -> Result<RegionScannerRef, BoxedError> {
1101        self.scan_region(region_id, request)
1102            .map_err(BoxedError::new)?
1103            .region_scanner()
1104            .await
1105            .map_err(BoxedError::new)
1106    }
1107
1108    async fn get_committed_sequence(
1109        &self,
1110        region_id: RegionId,
1111    ) -> Result<SequenceNumber, BoxedError> {
1112        self.inner
1113            .get_committed_sequence(region_id)
1114            .map_err(BoxedError::new)
1115    }
1116
1117    /// Retrieve region's metadata.
1118    async fn get_metadata(
1119        &self,
1120        region_id: RegionId,
1121    ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1122        self.inner.get_metadata(region_id).map_err(BoxedError::new)
1123    }
1124
1125    /// Stop the engine.
1126    ///
1127    /// Stopping the engine doesn't stop the underlying log store as other components might
1128    /// still use it. (When no other components are referencing the log store, it will
1129    /// automatically shutdown.)
1130    async fn stop(&self) -> std::result::Result<(), BoxedError> {
1131        self.inner.stop().await.map_err(BoxedError::new)
1132    }
1133
1134    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1135        self.get_region_statistic(region_id)
1136    }
1137
1138    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1139        self.inner
1140            .set_region_role(region_id, role)
1141            .map_err(BoxedError::new)
1142    }
1143
1144    async fn set_region_role_state_gracefully(
1145        &self,
1146        region_id: RegionId,
1147        region_role_state: SettableRegionRoleState,
1148    ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1149        let _timer = HANDLE_REQUEST_ELAPSED
1150            .with_label_values(&["set_region_role_state_gracefully"])
1151            .start_timer();
1152
1153        self.inner
1154            .set_region_role_state_gracefully(region_id, region_role_state)
1155            .await
1156            .map_err(BoxedError::new)
1157    }
1158
1159    async fn sync_region(
1160        &self,
1161        region_id: RegionId,
1162        manifest_info: RegionManifestInfo,
1163    ) -> Result<SyncManifestResponse, BoxedError> {
1164        let (_, synced) = self
1165            .inner
1166            .sync_region(region_id, manifest_info)
1167            .await
1168            .map_err(BoxedError::new)?;
1169
1170        Ok(SyncManifestResponse::Mito { synced })
1171    }
1172
1173    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1174        self.inner.role(region_id)
1175    }
1176
1177    fn as_any(&self) -> &dyn Any {
1178        self
1179    }
1180}
1181
1182impl MitoEngine {
1183    fn handle_alter_response(
1184        &self,
1185        region_id: RegionId,
1186        response: &mut RegionResponse,
1187    ) -> Result<()> {
1188        if let Some(statistic) = self.region_statistic(region_id) {
1189            Self::encode_manifest_info_to_extensions(
1190                &region_id,
1191                statistic.manifest,
1192                &mut response.extensions,
1193            )?;
1194        }
1195        let column_metadatas = self
1196            .inner
1197            .find_region(region_id)
1198            .ok()
1199            .map(|r| r.metadata().column_metadatas.clone());
1200        if let Some(column_metadatas) = column_metadatas {
1201            Self::encode_column_metadatas_to_extensions(
1202                &region_id,
1203                column_metadatas,
1204                &mut response.extensions,
1205            )?;
1206        }
1207        Ok(())
1208    }
1209
1210    fn handle_create_response(
1211        &self,
1212        region_id: RegionId,
1213        response: &mut RegionResponse,
1214    ) -> Result<()> {
1215        let column_metadatas = self
1216            .inner
1217            .find_region(region_id)
1218            .ok()
1219            .map(|r| r.metadata().column_metadatas.clone());
1220        if let Some(column_metadatas) = column_metadatas {
1221            Self::encode_column_metadatas_to_extensions(
1222                &region_id,
1223                column_metadatas,
1224                &mut response.extensions,
1225            )?;
1226        }
1227        Ok(())
1228    }
1229}
1230
1231// Tests methods.
1232#[cfg(any(test, feature = "test"))]
1233#[allow(clippy::too_many_arguments)]
1234impl MitoEngine {
1235    /// Returns a new [MitoEngine] for tests.
1236    pub async fn new_for_test<S: LogStore>(
1237        data_home: &str,
1238        mut config: MitoConfig,
1239        log_store: Arc<S>,
1240        object_store_manager: ObjectStoreManagerRef,
1241        write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1242        listener: Option<crate::engine::listener::EventListenerRef>,
1243        time_provider: crate::time_provider::TimeProviderRef,
1244        schema_metadata_manager: SchemaMetadataManagerRef,
1245        file_ref_manager: FileReferenceManagerRef,
1246        partition_expr_fetcher: PartitionExprFetcherRef,
1247    ) -> Result<MitoEngine> {
1248        config.sanitize(data_home)?;
1249
1250        let config = Arc::new(config);
1251        let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1252        Ok(MitoEngine {
1253            inner: Arc::new(EngineInner {
1254                workers: WorkerGroup::start_for_test(
1255                    config.clone(),
1256                    log_store,
1257                    object_store_manager,
1258                    write_buffer_manager,
1259                    listener,
1260                    schema_metadata_manager,
1261                    file_ref_manager,
1262                    time_provider,
1263                    partition_expr_fetcher,
1264                )
1265                .await?,
1266                config,
1267                wal_raw_entry_reader,
1268                #[cfg(feature = "enterprise")]
1269                extension_range_provider_factory: None,
1270            }),
1271        })
1272    }
1273
1274    /// Returns the purge scheduler.
1275    pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1276        self.inner.workers.purge_scheduler()
1277    }
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282    use std::time::Duration;
1283
1284    use super::*;
1285    use crate::sst::file::FileMeta;
1286
1287    #[test]
1288    fn test_is_valid_region_edit() {
1289        // Valid: has only "files_to_add"
1290        let edit = RegionEdit {
1291            files_to_add: vec![FileMeta::default()],
1292            files_to_remove: vec![],
1293            timestamp_ms: None,
1294            compaction_time_window: None,
1295            flushed_entry_id: None,
1296            flushed_sequence: None,
1297            committed_sequence: None,
1298        };
1299        assert!(is_valid_region_edit(&edit));
1300
1301        // Invalid: "files_to_add" is empty
1302        let edit = RegionEdit {
1303            files_to_add: vec![],
1304            files_to_remove: vec![],
1305            timestamp_ms: None,
1306            compaction_time_window: None,
1307            flushed_entry_id: None,
1308            flushed_sequence: None,
1309            committed_sequence: None,
1310        };
1311        assert!(!is_valid_region_edit(&edit));
1312
1313        // Invalid: "files_to_remove" is not empty
1314        let edit = RegionEdit {
1315            files_to_add: vec![FileMeta::default()],
1316            files_to_remove: vec![FileMeta::default()],
1317            timestamp_ms: None,
1318            compaction_time_window: None,
1319            flushed_entry_id: None,
1320            flushed_sequence: None,
1321            committed_sequence: None,
1322        };
1323        assert!(!is_valid_region_edit(&edit));
1324
1325        // Invalid: other fields are not all "None"s
1326        let edit = RegionEdit {
1327            files_to_add: vec![FileMeta::default()],
1328            files_to_remove: vec![],
1329            timestamp_ms: None,
1330            compaction_time_window: Some(Duration::from_secs(1)),
1331            flushed_entry_id: None,
1332            flushed_sequence: None,
1333            committed_sequence: None,
1334        };
1335        assert!(!is_valid_region_edit(&edit));
1336        let edit = RegionEdit {
1337            files_to_add: vec![FileMeta::default()],
1338            files_to_remove: vec![],
1339            timestamp_ms: None,
1340            compaction_time_window: None,
1341            flushed_entry_id: Some(1),
1342            flushed_sequence: None,
1343            committed_sequence: None,
1344        };
1345        assert!(!is_valid_region_edit(&edit));
1346        let edit = RegionEdit {
1347            files_to_add: vec![FileMeta::default()],
1348            files_to_remove: vec![],
1349            timestamp_ms: None,
1350            compaction_time_window: None,
1351            flushed_entry_id: None,
1352            flushed_sequence: Some(1),
1353            committed_sequence: None,
1354        };
1355        assert!(!is_valid_region_edit(&edit));
1356    }
1357}