metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod staging;
27mod state;
28mod sync;
29
30use std::any::Any;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33
34use api::region::RegionResponse;
35use async_trait::async_trait;
36use common_error::ext::{BoxedError, ErrorExt};
37use common_error::status_code::StatusCode;
38use common_runtime::RepeatedTask;
39use mito2::engine::MitoEngine;
40pub(crate) use options::IndexOptions;
41use snafu::{OptionExt, ResultExt};
42pub(crate) use state::MetricEngineState;
43use store_api::metadata::RegionMetadataRef;
44use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
45use store_api::region_engine::{
46    BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic,
47    RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
48    SetRegionRoleStateSuccess, SettableRegionRoleState, SyncRegionFromRequest,
49    SyncRegionFromResponse,
50};
51use store_api::region_request::{
52    BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
53};
54use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
55
56use crate::config::EngineConfig;
57use crate::data_region::DataRegion;
58use crate::error::{
59    self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu,
60    UnsupportedRemapManifestsRequestSnafu,
61};
62use crate::metadata_region::MetadataRegion;
63use crate::repeated_task::FlushMetadataRegionTask;
64use crate::row_modifier::RowModifier;
65use crate::utils::{self, get_region_statistic};
66
67#[cfg_attr(doc, aquamarine::aquamarine)]
68/// # Metric Engine
69///
70/// ## Regions
71///
72/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
73/// which refer to the region that actually stores the data. And `LogicalRegion`
74/// that is "simulated" over physical regions. Each logical region is associated
75/// with one physical region group, which is a group of two physical regions.
76/// Their relationship is illustrated below:
77///
78/// ```mermaid
79/// erDiagram
80///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
81///     PhysicalRegionGroup ||--|| DataRegion : contains
82///     PhysicalRegionGroup ||--|| MetadataRegion : contains
83/// ```
84///
85/// Metric engine uses two region groups. One is for data region
86/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
87/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
88/// From the definition of [`RegionId`], we can convert between these two physical
89/// region ids easily. Thus in the code base we usually refer to one "physical
90/// region id", and convert it to the other one when necessary.
91///
92/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
93/// storage or region group. Only a region id that is allocated by meta server.
94/// And all other things is shared with other logical region that are associated
95/// with the same physical region group.
96///
97/// For more document about physical regions, please refer to [`MetadataRegion`]
98/// and [`DataRegion`].
99///
100/// ## Operations
101///
102/// Both physical and logical region are accessible to user. But the operation
103/// they support are different. List below:
104///
105/// | Operations | Logical Region | Physical Region |
106/// | ---------- | -------------- | --------------- |
107/// |   Create   |       ✅        |        ✅        |
108/// |    Drop    |       ✅        |        ❓*       |
109/// |   Write    |       ✅        |        ❌        |
110/// |    Read    |       ✅        |        ✅        |
111/// |   Close    |       ✅        |        ✅        |
112/// |    Open    |       ✅        |        ✅        |
113/// |   Alter    |       ✅        |        ❓*       |
114///
115/// *: Physical region can be dropped only when all related logical regions are dropped.
116/// *: Alter: Physical regions only support altering region options.
117///
118/// ## Internal Columns
119///
120/// The physical data region contains two internal columns. Should
121/// mention that "internal" here is for metric engine itself. Mito
122/// engine will add it's internal columns to the region as well.
123///
124/// Their column id is registered in [`ReservedColumnId`]. And column name is
125/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
126///
127/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
128/// id to distinguish data from different logical tables.
129#[derive(Clone)]
130pub struct MetricEngine {
131    inner: Arc<MetricEngineInner>,
132}
133
134#[async_trait]
135impl RegionEngine for MetricEngine {
136    /// Name of this engine
137    fn name(&self) -> &str {
138        METRIC_ENGINE_NAME
139    }
140
141    async fn handle_batch_open_requests(
142        &self,
143        parallelism: usize,
144        requests: Vec<(RegionId, RegionOpenRequest)>,
145    ) -> Result<BatchResponses, BoxedError> {
146        self.inner
147            .handle_batch_open_requests(parallelism, requests)
148            .await
149            .map_err(BoxedError::new)
150    }
151
152    async fn handle_batch_catchup_requests(
153        &self,
154        parallelism: usize,
155        requests: Vec<(RegionId, RegionCatchupRequest)>,
156    ) -> Result<BatchResponses, BoxedError> {
157        self.inner
158            .handle_batch_catchup_requests(parallelism, requests)
159            .await
160            .map_err(BoxedError::new)
161    }
162
163    async fn handle_batch_ddl_requests(
164        &self,
165        batch_request: BatchRegionDdlRequest,
166    ) -> Result<RegionResponse, BoxedError> {
167        match batch_request {
168            BatchRegionDdlRequest::Create(requests) => {
169                let mut extension_return_value = HashMap::new();
170                let rows = self
171                    .inner
172                    .create_regions(requests, &mut extension_return_value)
173                    .await
174                    .map_err(BoxedError::new)?;
175
176                Ok(RegionResponse {
177                    affected_rows: rows,
178                    extensions: extension_return_value,
179                    metadata: Vec::new(),
180                })
181            }
182            BatchRegionDdlRequest::Alter(requests) => {
183                let mut extension_return_value = HashMap::new();
184                let rows = self
185                    .inner
186                    .alter_regions(requests, &mut extension_return_value)
187                    .await
188                    .map_err(BoxedError::new)?;
189
190                Ok(RegionResponse {
191                    affected_rows: rows,
192                    extensions: extension_return_value,
193                    metadata: Vec::new(),
194                })
195            }
196            BatchRegionDdlRequest::Drop(requests) => {
197                self.handle_requests(
198                    requests
199                        .into_iter()
200                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
201                )
202                .await
203            }
204        }
205    }
206
207    /// Handles non-query request to the region. Returns the count of affected rows.
208    async fn handle_request(
209        &self,
210        region_id: RegionId,
211        request: RegionRequest,
212    ) -> Result<RegionResponse, BoxedError> {
213        let mut extension_return_value = HashMap::new();
214
215        let result = match request {
216            RegionRequest::EnterStaging(_) => {
217                if self.inner.is_physical_region(region_id) {
218                    self.handle_enter_staging_request(region_id, request).await
219                } else {
220                    UnsupportedRegionRequestSnafu { request }.fail()
221                }
222            }
223            RegionRequest::ApplyStagingManifest(_) => {
224                if self.inner.is_physical_region(region_id) {
225                    return self.inner.mito.handle_request(region_id, request).await;
226                } else {
227                    UnsupportedRegionRequestSnafu { request }.fail()
228                }
229            }
230            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
231            RegionRequest::Create(create) => {
232                self.inner
233                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
234                    .await
235            }
236            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
237            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
238            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
239            RegionRequest::Alter(alter) => {
240                self.inner
241                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
242                    .await
243            }
244            RegionRequest::Compact(_) => {
245                if self.inner.is_physical_region(region_id) {
246                    self.inner
247                        .mito
248                        .handle_request(region_id, request)
249                        .await
250                        .context(error::MitoFlushOperationSnafu)
251                        .map(|response| response.affected_rows)
252                } else {
253                    UnsupportedRegionRequestSnafu { request }.fail()
254                }
255            }
256            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
257            RegionRequest::BuildIndex(_) => {
258                if self.inner.is_physical_region(region_id) {
259                    self.inner
260                        .mito
261                        .handle_request(region_id, request)
262                        .await
263                        .context(error::MitoFlushOperationSnafu)
264                        .map(|response| response.affected_rows)
265                } else {
266                    UnsupportedRegionRequestSnafu { request }.fail()
267                }
268            }
269            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
270            RegionRequest::Delete(delete) => self.inner.delete_region(region_id, delete).await,
271            RegionRequest::Catchup(_) => {
272                let mut response = self
273                    .inner
274                    .handle_batch_catchup_requests(
275                        1,
276                        vec![(region_id, RegionCatchupRequest::default())],
277                    )
278                    .await
279                    .map_err(BoxedError::new)?;
280                debug_assert_eq!(response.len(), 1);
281                let (resp_region_id, response) = response
282                    .pop()
283                    .context(error::UnexpectedRequestSnafu {
284                        reason: "expected 1 response, but got zero responses",
285                    })
286                    .map_err(BoxedError::new)?;
287                debug_assert_eq!(region_id, resp_region_id);
288                return response;
289            }
290            RegionRequest::BulkInserts(_) => {
291                // todo(hl): find a way to support bulk inserts in metric engine.
292                UnsupportedRegionRequestSnafu { request }.fail()
293            }
294        };
295
296        result.map_err(BoxedError::new).map(|rows| RegionResponse {
297            affected_rows: rows,
298            extensions: extension_return_value,
299            metadata: Vec::new(),
300        })
301    }
302
303    async fn handle_query(
304        &self,
305        region_id: RegionId,
306        request: ScanRequest,
307    ) -> Result<RegionScannerRef, BoxedError> {
308        self.handle_query(region_id, request).await
309    }
310
311    async fn get_committed_sequence(
312        &self,
313        region_id: RegionId,
314    ) -> Result<SequenceNumber, BoxedError> {
315        self.inner
316            .get_last_seq_num(region_id)
317            .await
318            .map_err(BoxedError::new)
319    }
320
321    /// Retrieves region's metadata.
322    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
323        self.inner
324            .load_region_metadata(region_id)
325            .await
326            .map_err(BoxedError::new)
327    }
328
329    /// Retrieves region's disk usage.
330    ///
331    /// Note: Returns `None` if it's a logical region.
332    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
333        if self.inner.is_physical_region(region_id) {
334            get_region_statistic(&self.inner.mito, region_id)
335        } else {
336            None
337        }
338    }
339
340    /// Stops the engine
341    async fn stop(&self) -> Result<(), BoxedError> {
342        // don't need to stop the underlying mito engine
343        Ok(())
344    }
345
346    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
347        // ignore the region not found error
348        for x in [
349            utils::to_metadata_region_id(region_id),
350            utils::to_data_region_id(region_id),
351        ] {
352            if let Err(e) = self.inner.mito.set_region_role(x, role)
353                && e.status_code() != StatusCode::RegionNotFound
354            {
355                return Err(e);
356            }
357        }
358        Ok(())
359    }
360
361    async fn sync_region(
362        &self,
363        region_id: RegionId,
364        request: SyncRegionFromRequest,
365    ) -> Result<SyncRegionFromResponse, BoxedError> {
366        match request {
367            SyncRegionFromRequest::FromManifest(manifest_info) => self
368                .inner
369                .sync_region_from_manifest(region_id, manifest_info)
370                .await
371                .map_err(BoxedError::new),
372            SyncRegionFromRequest::FromRegion {
373                source_region_id,
374                parallelism,
375            } => {
376                if self.inner.is_physical_region(region_id) {
377                    self.inner
378                        .sync_region_from_region(region_id, source_region_id, parallelism)
379                        .await
380                        .map_err(BoxedError::new)
381                } else {
382                    Err(BoxedError::new(
383                        error::UnsupportedSyncRegionFromRequestSnafu { region_id }.build(),
384                    ))
385                }
386            }
387        }
388    }
389
390    async fn remap_manifests(
391        &self,
392        request: RemapManifestsRequest,
393    ) -> Result<RemapManifestsResponse, BoxedError> {
394        let region_id = request.region_id;
395        if self.inner.is_physical_region(region_id) {
396            self.inner.mito.remap_manifests(request).await
397        } else {
398            Err(BoxedError::new(
399                UnsupportedRemapManifestsRequestSnafu { region_id }.build(),
400            ))
401        }
402    }
403
404    async fn set_region_role_state_gracefully(
405        &self,
406        region_id: RegionId,
407        region_role_state: SettableRegionRoleState,
408    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
409        let metadata_result = match self
410            .inner
411            .mito
412            .set_region_role_state_gracefully(
413                utils::to_metadata_region_id(region_id),
414                region_role_state,
415            )
416            .await?
417        {
418            SetRegionRoleStateResponse::Success(success) => success,
419            SetRegionRoleStateResponse::NotFound => {
420                return Ok(SetRegionRoleStateResponse::NotFound);
421            }
422            SetRegionRoleStateResponse::InvalidTransition(error) => {
423                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
424            }
425        };
426
427        let data_result = match self
428            .inner
429            .mito
430            .set_region_role_state_gracefully(region_id, region_role_state)
431            .await?
432        {
433            SetRegionRoleStateResponse::Success(success) => success,
434            SetRegionRoleStateResponse::NotFound => {
435                return Ok(SetRegionRoleStateResponse::NotFound);
436            }
437            SetRegionRoleStateResponse::InvalidTransition(error) => {
438                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
439            }
440        };
441
442        Ok(SetRegionRoleStateResponse::success(
443            SetRegionRoleStateSuccess::metric(
444                data_result.last_entry_id().unwrap_or_default(),
445                metadata_result.last_entry_id().unwrap_or_default(),
446            ),
447        ))
448    }
449
450    /// Returns the physical region role.
451    ///
452    /// Note: Returns `None` if it's a logical region.
453    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
454        if self.inner.is_physical_region(region_id) {
455            self.inner.mito.role(region_id)
456        } else {
457            None
458        }
459    }
460
461    fn as_any(&self) -> &dyn Any {
462        self
463    }
464}
465
466impl MetricEngine {
467    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
468        let metadata_region = MetadataRegion::new(mito.clone());
469        let data_region = DataRegion::new(mito.clone());
470        let state = Arc::new(RwLock::default());
471        config.sanitize();
472        let flush_interval = config.flush_metadata_region_interval;
473        let inner = Arc::new(MetricEngineInner {
474            mito: mito.clone(),
475            metadata_region,
476            data_region,
477            state: state.clone(),
478            config,
479            row_modifier: RowModifier::default(),
480            flush_task: RepeatedTask::new(
481                flush_interval,
482                Box::new(FlushMetadataRegionTask {
483                    state: state.clone(),
484                    mito: mito.clone(),
485                }),
486            ),
487        });
488        inner
489            .flush_task
490            .start(common_runtime::global_runtime())
491            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
492        Ok(Self { inner })
493    }
494
495    pub fn mito(&self) -> MitoEngine {
496        self.inner.mito.clone()
497    }
498
499    /// Returns all logical regions associated with the physical region.
500    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
501        self.inner
502            .metadata_region
503            .logical_regions(physical_region_id)
504            .await
505    }
506
507    /// Handles substrait query and return a stream of record batches
508    async fn handle_query(
509        &self,
510        region_id: RegionId,
511        request: ScanRequest,
512    ) -> Result<RegionScannerRef, BoxedError> {
513        self.inner
514            .read_region(region_id, request)
515            .await
516            .map_err(BoxedError::new)
517    }
518
519    async fn handle_requests(
520        &self,
521        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
522    ) -> Result<RegionResponse, BoxedError> {
523        let mut affected_rows = 0;
524        let mut extensions = HashMap::new();
525        for (region_id, request) in requests {
526            let response = self.handle_request(region_id, request).await?;
527            affected_rows += response.affected_rows;
528            extensions.extend(response.extensions);
529        }
530
531        Ok(RegionResponse {
532            affected_rows,
533            extensions,
534            metadata: Vec::new(),
535        })
536    }
537}
538
539#[cfg(test)]
540impl MetricEngine {
541    pub async fn scan_to_stream(
542        &self,
543        region_id: RegionId,
544        request: ScanRequest,
545    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
546        self.inner.scan_to_stream(region_id, request).await
547    }
548
549    /// Returns the configuration of the engine.
550    pub fn config(&self) -> &EngineConfig {
551        &self.inner.config
552    }
553}
554
555struct MetricEngineInner {
556    mito: MitoEngine,
557    metadata_region: MetadataRegion,
558    data_region: DataRegion,
559    state: Arc<RwLock<MetricEngineState>>,
560    config: EngineConfig,
561    row_modifier: RowModifier,
562    flush_task: RepeatedTask<Error>,
563}
564
565#[cfg(test)]
566mod test {
567    use std::collections::HashMap;
568
569    use common_telemetry::info;
570    use common_wal::options::{KafkaWalOptions, WalOptions};
571    use mito2::sst::location::region_dir_from_table_dir;
572    use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store};
573    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
574    use store_api::mito_engine_options::WAL_OPTIONS_KEY;
575    use store_api::region_request::{
576        PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
577    };
578
579    use super::*;
580    use crate::maybe_skip_kafka_log_store_integration_test;
581    use crate::test_util::TestEnv;
582
583    #[tokio::test]
584    async fn close_open_regions() {
585        let env = TestEnv::new().await;
586        env.init_metric_region().await;
587        let engine = env.metric();
588
589        // close physical region
590        let physical_region_id = env.default_physical_region_id();
591        engine
592            .handle_request(
593                physical_region_id,
594                RegionRequest::Close(RegionCloseRequest {}),
595            )
596            .await
597            .unwrap();
598
599        // reopen physical region
600        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
601            .into_iter()
602            .collect();
603        let open_request = RegionOpenRequest {
604            engine: METRIC_ENGINE_NAME.to_string(),
605            table_dir: TestEnv::default_table_dir(),
606            path_type: PathType::Bare, // Use Bare path type for engine regions
607            options: physical_region_option,
608            skip_wal_replay: false,
609            checkpoint: None,
610        };
611        engine
612            .handle_request(physical_region_id, RegionRequest::Open(open_request))
613            .await
614            .unwrap();
615
616        // close nonexistent region won't report error
617        let nonexistent_region_id = RegionId::new(12313, 12);
618        engine
619            .handle_request(
620                nonexistent_region_id,
621                RegionRequest::Close(RegionCloseRequest {}),
622            )
623            .await
624            .unwrap();
625
626        // open nonexistent region won't report error
627        let invalid_open_request = RegionOpenRequest {
628            engine: METRIC_ENGINE_NAME.to_string(),
629            table_dir: TestEnv::default_table_dir(),
630            path_type: PathType::Bare, // Use Bare path type for engine regions
631            options: HashMap::new(),
632            skip_wal_replay: false,
633            checkpoint: None,
634        };
635        engine
636            .handle_request(
637                nonexistent_region_id,
638                RegionRequest::Open(invalid_open_request),
639            )
640            .await
641            .unwrap();
642    }
643
644    #[tokio::test]
645    async fn test_role() {
646        let env = TestEnv::new().await;
647        env.init_metric_region().await;
648
649        let logical_region_id = env.default_logical_region_id();
650        let physical_region_id = env.default_physical_region_id();
651
652        assert!(env.metric().role(logical_region_id).is_none());
653        assert!(env.metric().role(physical_region_id).is_some());
654    }
655
656    #[tokio::test]
657    async fn test_region_disk_usage() {
658        let env = TestEnv::new().await;
659        env.init_metric_region().await;
660
661        let logical_region_id = env.default_logical_region_id();
662        let physical_region_id = env.default_physical_region_id();
663
664        assert!(env.metric().region_statistic(logical_region_id).is_none());
665        assert!(env.metric().region_statistic(physical_region_id).is_some());
666    }
667
668    #[tokio::test]
669    async fn test_open_region_failure() {
670        let env = TestEnv::new().await;
671        env.init_metric_region().await;
672        let physical_region_id = env.default_physical_region_id();
673
674        let metric_engine = env.metric();
675        metric_engine
676            .handle_request(
677                physical_region_id,
678                RegionRequest::Flush(RegionFlushRequest {
679                    row_group_size: None,
680                }),
681            )
682            .await
683            .unwrap();
684
685        let path = region_dir_from_table_dir(
686            &TestEnv::default_table_dir(),
687            physical_region_id,
688            PathType::Metadata,
689        );
690        let object_store = env.get_object_store().unwrap();
691        let list = object_store.list(&path).await.unwrap();
692        // Delete parquet files in metadata region
693        for entry in list {
694            if entry.metadata().is_dir() {
695                continue;
696            }
697            if entry.name().ends_with("parquet") {
698                info!("deleting {}", entry.path());
699                object_store.delete(entry.path()).await.unwrap();
700            }
701        }
702
703        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
704            .into_iter()
705            .collect();
706        let open_request = RegionOpenRequest {
707            engine: METRIC_ENGINE_NAME.to_string(),
708            table_dir: TestEnv::default_table_dir(),
709            path_type: PathType::Bare,
710            options: physical_region_option,
711            skip_wal_replay: false,
712            checkpoint: None,
713        };
714        // Opening an already opened region should succeed.
715        // Since the region is already open, no metadata recovery operations will be performed.
716        metric_engine
717            .handle_request(physical_region_id, RegionRequest::Open(open_request))
718            .await
719            .unwrap();
720
721        // Close the region
722        metric_engine
723            .handle_request(
724                physical_region_id,
725                RegionRequest::Close(RegionCloseRequest {}),
726            )
727            .await
728            .unwrap();
729
730        // Try to reopen region.
731        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
732            .into_iter()
733            .collect();
734        let open_request = RegionOpenRequest {
735            engine: METRIC_ENGINE_NAME.to_string(),
736            table_dir: TestEnv::default_table_dir(),
737            path_type: PathType::Bare,
738            options: physical_region_option,
739            skip_wal_replay: false,
740            checkpoint: None,
741        };
742        let err = metric_engine
743            .handle_request(physical_region_id, RegionRequest::Open(open_request))
744            .await
745            .unwrap_err();
746        // Failed to open region because of missing parquet files.
747        assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
748
749        let mito_engine = metric_engine.mito();
750        let data_region_id = utils::to_data_region_id(physical_region_id);
751        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
752        // The metadata/data region should be closed.
753        let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
754        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
755        let err = mito_engine
756            .get_metadata(metadata_region_id)
757            .await
758            .unwrap_err();
759        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
760    }
761
762    #[tokio::test]
763    async fn test_catchup_regions() {
764        common_telemetry::init_default_ut_logging();
765        maybe_skip_kafka_log_store_integration_test!();
766        let kafka_log_store_factory = kafka_log_store_factory().unwrap();
767        let mito_env = mito2::test_util::TestEnv::new()
768            .await
769            .with_log_store_factory(kafka_log_store_factory.clone());
770        let env = TestEnv::with_mito_env(mito_env).await;
771        let table_dir = |region_id| format!("table/{region_id}");
772        let mut physical_region_ids = vec![];
773        let mut logical_region_ids = vec![];
774
775        let num_topics = 3;
776        let num_physical_regions = 8;
777        let num_logical_regions = 16;
778        let parallelism = 2;
779        let mut topics = Vec::with_capacity(num_topics);
780        for _ in 0..num_topics {
781            let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory)
782                .await
783                .unwrap();
784            topics.push(topic);
785        }
786
787        let topic_idx = |id| (id as usize) % num_topics;
788        // Creates physical regions
789        for i in 0..num_physical_regions {
790            let physical_region_id = RegionId::new(1, i);
791            physical_region_ids.push(physical_region_id);
792
793            let wal_options = WalOptions::Kafka(KafkaWalOptions {
794                topic: topics[topic_idx(i)].clone(),
795            });
796            env.create_physical_region(
797                physical_region_id,
798                &table_dir(physical_region_id),
799                vec![(
800                    WAL_OPTIONS_KEY.to_string(),
801                    serde_json::to_string(&wal_options).unwrap(),
802                )],
803            )
804            .await;
805            // Creates logical regions for each physical region
806            for j in 0..num_logical_regions {
807                let logical_region_id = RegionId::new(1024 + i, j);
808                logical_region_ids.push(logical_region_id);
809                env.create_logical_region(physical_region_id, logical_region_id)
810                    .await;
811            }
812        }
813
814        let metric_engine = env.metric();
815        // Closes all regions
816        for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) {
817            metric_engine
818                .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
819                .await
820                .unwrap();
821        }
822
823        // Opens all regions and skip the wal
824        let requests = physical_region_ids
825            .iter()
826            .enumerate()
827            .map(|(idx, region_id)| {
828                let mut options = HashMap::new();
829                let wal_options = WalOptions::Kafka(KafkaWalOptions {
830                    topic: topics[topic_idx(idx as u32)].clone(),
831                });
832                options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new());
833                options.insert(
834                    WAL_OPTIONS_KEY.to_string(),
835                    serde_json::to_string(&wal_options).unwrap(),
836                );
837                (
838                    *region_id,
839                    RegionOpenRequest {
840                        engine: METRIC_ENGINE_NAME.to_string(),
841                        table_dir: table_dir(*region_id),
842                        path_type: PathType::Bare,
843                        options: options.clone(),
844                        skip_wal_replay: true,
845                        checkpoint: None,
846                    },
847                )
848            })
849            .collect::<Vec<_>>();
850        info!("Open batch regions with parallelism: {parallelism}");
851        metric_engine
852            .handle_batch_open_requests(parallelism, requests)
853            .await
854            .unwrap();
855        {
856            let state = metric_engine.inner.state.read().unwrap();
857            for logical_region in &logical_region_ids {
858                assert!(!state.logical_regions().contains_key(logical_region));
859            }
860        }
861
862        let catch_requests = physical_region_ids
863            .iter()
864            .map(|region_id| {
865                (
866                    *region_id,
867                    RegionCatchupRequest {
868                        set_writable: true,
869                        ..Default::default()
870                    },
871                )
872            })
873            .collect::<Vec<_>>();
874        metric_engine
875            .handle_batch_catchup_requests(parallelism, catch_requests)
876            .await
877            .unwrap();
878        {
879            let state = metric_engine.inner.state.read().unwrap();
880            for logical_region in &logical_region_ids {
881                assert!(state.logical_regions().contains_key(logical_region));
882            }
883        }
884    }
885}