metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod staging;
27mod state;
28mod sync;
29
30use std::any::Any;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33
34use api::region::RegionResponse;
35use async_trait::async_trait;
36use common_error::ext::{BoxedError, ErrorExt};
37use common_error::status_code::StatusCode;
38use common_runtime::RepeatedTask;
39use mito2::engine::MitoEngine;
40pub(crate) use options::IndexOptions;
41use snafu::{OptionExt, ResultExt};
42pub(crate) use state::MetricEngineState;
43use store_api::metadata::RegionMetadataRef;
44use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
45use store_api::region_engine::{
46    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
47    RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
48    SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse,
49};
50use store_api::region_request::{
51    BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
52};
53use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
54
55use crate::config::EngineConfig;
56use crate::data_region::DataRegion;
57use crate::error::{
58    self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu,
59    UnsupportedRemapManifestsRequestSnafu,
60};
61use crate::metadata_region::MetadataRegion;
62use crate::repeated_task::FlushMetadataRegionTask;
63use crate::row_modifier::RowModifier;
64use crate::utils::{self, get_region_statistic};
65
66#[cfg_attr(doc, aquamarine::aquamarine)]
67/// # Metric Engine
68///
69/// ## Regions
70///
71/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
72/// which refer to the region that actually stores the data. And `LogicalRegion`
73/// that is "simulated" over physical regions. Each logical region is associated
74/// with one physical region group, which is a group of two physical regions.
75/// Their relationship is illustrated below:
76///
77/// ```mermaid
78/// erDiagram
79///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
80///     PhysicalRegionGroup ||--|| DataRegion : contains
81///     PhysicalRegionGroup ||--|| MetadataRegion : contains
82/// ```
83///
84/// Metric engine uses two region groups. One is for data region
85/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
86/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
87/// From the definition of [`RegionId`], we can convert between these two physical
88/// region ids easily. Thus in the code base we usually refer to one "physical
89/// region id", and convert it to the other one when necessary.
90///
91/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
92/// storage or region group. Only a region id that is allocated by meta server.
93/// And all other things is shared with other logical region that are associated
94/// with the same physical region group.
95///
96/// For more document about physical regions, please refer to [`MetadataRegion`]
97/// and [`DataRegion`].
98///
99/// ## Operations
100///
101/// Both physical and logical region are accessible to user. But the operation
102/// they support are different. List below:
103///
104/// | Operations | Logical Region | Physical Region |
105/// | ---------- | -------------- | --------------- |
106/// |   Create   |       ✅        |        ✅        |
107/// |    Drop    |       ✅        |        ❓*       |
108/// |   Write    |       ✅        |        ❌        |
109/// |    Read    |       ✅        |        ✅        |
110/// |   Close    |       ✅        |        ✅        |
111/// |    Open    |       ✅        |        ✅        |
112/// |   Alter    |       ✅        |        ❓*       |
113///
114/// *: Physical region can be dropped only when all related logical regions are dropped.
115/// *: Alter: Physical regions only support altering region options.
116///
117/// ## Internal Columns
118///
119/// The physical data region contains two internal columns. Should
120/// mention that "internal" here is for metric engine itself. Mito
121/// engine will add it's internal columns to the region as well.
122///
123/// Their column id is registered in [`ReservedColumnId`]. And column name is
124/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
125///
126/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
127/// id to distinguish data from different logical tables.
128#[derive(Clone)]
129pub struct MetricEngine {
130    inner: Arc<MetricEngineInner>,
131}
132
133#[async_trait]
134impl RegionEngine for MetricEngine {
135    /// Name of this engine
136    fn name(&self) -> &str {
137        METRIC_ENGINE_NAME
138    }
139
140    async fn handle_batch_open_requests(
141        &self,
142        parallelism: usize,
143        requests: Vec<(RegionId, RegionOpenRequest)>,
144    ) -> Result<BatchResponses, BoxedError> {
145        self.inner
146            .handle_batch_open_requests(parallelism, requests)
147            .await
148            .map_err(BoxedError::new)
149    }
150
151    async fn handle_batch_catchup_requests(
152        &self,
153        parallelism: usize,
154        requests: Vec<(RegionId, RegionCatchupRequest)>,
155    ) -> Result<BatchResponses, BoxedError> {
156        self.inner
157            .handle_batch_catchup_requests(parallelism, requests)
158            .await
159            .map_err(BoxedError::new)
160    }
161
162    async fn handle_batch_ddl_requests(
163        &self,
164        batch_request: BatchRegionDdlRequest,
165    ) -> Result<RegionResponse, BoxedError> {
166        match batch_request {
167            BatchRegionDdlRequest::Create(requests) => {
168                let mut extension_return_value = HashMap::new();
169                let rows = self
170                    .inner
171                    .create_regions(requests, &mut extension_return_value)
172                    .await
173                    .map_err(BoxedError::new)?;
174
175                Ok(RegionResponse {
176                    affected_rows: rows,
177                    extensions: extension_return_value,
178                    metadata: Vec::new(),
179                })
180            }
181            BatchRegionDdlRequest::Alter(requests) => {
182                let mut extension_return_value = HashMap::new();
183                let rows = self
184                    .inner
185                    .alter_regions(requests, &mut extension_return_value)
186                    .await
187                    .map_err(BoxedError::new)?;
188
189                Ok(RegionResponse {
190                    affected_rows: rows,
191                    extensions: extension_return_value,
192                    metadata: Vec::new(),
193                })
194            }
195            BatchRegionDdlRequest::Drop(requests) => {
196                self.handle_requests(
197                    requests
198                        .into_iter()
199                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
200                )
201                .await
202            }
203        }
204    }
205
206    /// Handles non-query request to the region. Returns the count of affected rows.
207    async fn handle_request(
208        &self,
209        region_id: RegionId,
210        request: RegionRequest,
211    ) -> Result<RegionResponse, BoxedError> {
212        let mut extension_return_value = HashMap::new();
213
214        let result = match request {
215            RegionRequest::EnterStaging(_) => {
216                if self.inner.is_physical_region(region_id) {
217                    self.handle_enter_staging_request(region_id, request).await
218                } else {
219                    UnsupportedRegionRequestSnafu { request }.fail()
220                }
221            }
222            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
223            RegionRequest::Create(create) => {
224                self.inner
225                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
226                    .await
227            }
228            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
229            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
230            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
231            RegionRequest::Alter(alter) => {
232                self.inner
233                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
234                    .await
235            }
236            RegionRequest::Compact(_) => {
237                if self.inner.is_physical_region(region_id) {
238                    self.inner
239                        .mito
240                        .handle_request(region_id, request)
241                        .await
242                        .context(error::MitoFlushOperationSnafu)
243                        .map(|response| response.affected_rows)
244                } else {
245                    UnsupportedRegionRequestSnafu { request }.fail()
246                }
247            }
248            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
249            RegionRequest::BuildIndex(_) => {
250                if self.inner.is_physical_region(region_id) {
251                    self.inner
252                        .mito
253                        .handle_request(region_id, request)
254                        .await
255                        .context(error::MitoFlushOperationSnafu)
256                        .map(|response| response.affected_rows)
257                } else {
258                    UnsupportedRegionRequestSnafu { request }.fail()
259                }
260            }
261            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
262            RegionRequest::Delete(delete) => self.inner.delete_region(region_id, delete).await,
263            RegionRequest::Catchup(_) => {
264                let mut response = self
265                    .inner
266                    .handle_batch_catchup_requests(
267                        1,
268                        vec![(region_id, RegionCatchupRequest::default())],
269                    )
270                    .await
271                    .map_err(BoxedError::new)?;
272                debug_assert_eq!(response.len(), 1);
273                let (resp_region_id, response) = response
274                    .pop()
275                    .context(error::UnexpectedRequestSnafu {
276                        reason: "expected 1 response, but got zero responses",
277                    })
278                    .map_err(BoxedError::new)?;
279                debug_assert_eq!(region_id, resp_region_id);
280                return response;
281            }
282            RegionRequest::BulkInserts(_) => {
283                // todo(hl): find a way to support bulk inserts in metric engine.
284                UnsupportedRegionRequestSnafu { request }.fail()
285            }
286        };
287
288        result.map_err(BoxedError::new).map(|rows| RegionResponse {
289            affected_rows: rows,
290            extensions: extension_return_value,
291            metadata: Vec::new(),
292        })
293    }
294
295    async fn handle_query(
296        &self,
297        region_id: RegionId,
298        request: ScanRequest,
299    ) -> Result<RegionScannerRef, BoxedError> {
300        self.handle_query(region_id, request).await
301    }
302
303    async fn get_committed_sequence(
304        &self,
305        region_id: RegionId,
306    ) -> Result<SequenceNumber, BoxedError> {
307        self.inner
308            .get_last_seq_num(region_id)
309            .await
310            .map_err(BoxedError::new)
311    }
312
313    /// Retrieves region's metadata.
314    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
315        self.inner
316            .load_region_metadata(region_id)
317            .await
318            .map_err(BoxedError::new)
319    }
320
321    /// Retrieves region's disk usage.
322    ///
323    /// Note: Returns `None` if it's a logical region.
324    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
325        if self.inner.is_physical_region(region_id) {
326            get_region_statistic(&self.inner.mito, region_id)
327        } else {
328            None
329        }
330    }
331
332    /// Stops the engine
333    async fn stop(&self) -> Result<(), BoxedError> {
334        // don't need to stop the underlying mito engine
335        Ok(())
336    }
337
338    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
339        // ignore the region not found error
340        for x in [
341            utils::to_metadata_region_id(region_id),
342            utils::to_data_region_id(region_id),
343        ] {
344            if let Err(e) = self.inner.mito.set_region_role(x, role)
345                && e.status_code() != StatusCode::RegionNotFound
346            {
347                return Err(e);
348            }
349        }
350        Ok(())
351    }
352
353    async fn sync_region(
354        &self,
355        region_id: RegionId,
356        manifest_info: RegionManifestInfo,
357    ) -> Result<SyncManifestResponse, BoxedError> {
358        self.inner
359            .sync_region(region_id, manifest_info)
360            .await
361            .map_err(BoxedError::new)
362    }
363
364    async fn remap_manifests(
365        &self,
366        request: RemapManifestsRequest,
367    ) -> Result<RemapManifestsResponse, BoxedError> {
368        let region_id = request.region_id;
369        if self.inner.is_physical_region(region_id) {
370            self.inner.mito.remap_manifests(request).await
371        } else {
372            Err(BoxedError::new(
373                UnsupportedRemapManifestsRequestSnafu { region_id }.build(),
374            ))
375        }
376    }
377
378    async fn set_region_role_state_gracefully(
379        &self,
380        region_id: RegionId,
381        region_role_state: SettableRegionRoleState,
382    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
383        let metadata_result = match self
384            .inner
385            .mito
386            .set_region_role_state_gracefully(
387                utils::to_metadata_region_id(region_id),
388                region_role_state,
389            )
390            .await?
391        {
392            SetRegionRoleStateResponse::Success(success) => success,
393            SetRegionRoleStateResponse::NotFound => {
394                return Ok(SetRegionRoleStateResponse::NotFound);
395            }
396            SetRegionRoleStateResponse::InvalidTransition(error) => {
397                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
398            }
399        };
400
401        let data_result = match self
402            .inner
403            .mito
404            .set_region_role_state_gracefully(region_id, region_role_state)
405            .await?
406        {
407            SetRegionRoleStateResponse::Success(success) => success,
408            SetRegionRoleStateResponse::NotFound => {
409                return Ok(SetRegionRoleStateResponse::NotFound);
410            }
411            SetRegionRoleStateResponse::InvalidTransition(error) => {
412                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
413            }
414        };
415
416        Ok(SetRegionRoleStateResponse::success(
417            SetRegionRoleStateSuccess::metric(
418                data_result.last_entry_id().unwrap_or_default(),
419                metadata_result.last_entry_id().unwrap_or_default(),
420            ),
421        ))
422    }
423
424    /// Returns the physical region role.
425    ///
426    /// Note: Returns `None` if it's a logical region.
427    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
428        if self.inner.is_physical_region(region_id) {
429            self.inner.mito.role(region_id)
430        } else {
431            None
432        }
433    }
434
435    fn as_any(&self) -> &dyn Any {
436        self
437    }
438}
439
440impl MetricEngine {
441    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
442        let metadata_region = MetadataRegion::new(mito.clone());
443        let data_region = DataRegion::new(mito.clone());
444        let state = Arc::new(RwLock::default());
445        config.sanitize();
446        let flush_interval = config.flush_metadata_region_interval;
447        let inner = Arc::new(MetricEngineInner {
448            mito: mito.clone(),
449            metadata_region,
450            data_region,
451            state: state.clone(),
452            config,
453            row_modifier: RowModifier::default(),
454            flush_task: RepeatedTask::new(
455                flush_interval,
456                Box::new(FlushMetadataRegionTask {
457                    state: state.clone(),
458                    mito: mito.clone(),
459                }),
460            ),
461        });
462        inner
463            .flush_task
464            .start(common_runtime::global_runtime())
465            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
466        Ok(Self { inner })
467    }
468
469    pub fn mito(&self) -> MitoEngine {
470        self.inner.mito.clone()
471    }
472
473    /// Returns all logical regions associated with the physical region.
474    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
475        self.inner
476            .metadata_region
477            .logical_regions(physical_region_id)
478            .await
479    }
480
481    /// Handles substrait query and return a stream of record batches
482    async fn handle_query(
483        &self,
484        region_id: RegionId,
485        request: ScanRequest,
486    ) -> Result<RegionScannerRef, BoxedError> {
487        self.inner
488            .read_region(region_id, request)
489            .await
490            .map_err(BoxedError::new)
491    }
492
493    async fn handle_requests(
494        &self,
495        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
496    ) -> Result<RegionResponse, BoxedError> {
497        let mut affected_rows = 0;
498        let mut extensions = HashMap::new();
499        for (region_id, request) in requests {
500            let response = self.handle_request(region_id, request).await?;
501            affected_rows += response.affected_rows;
502            extensions.extend(response.extensions);
503        }
504
505        Ok(RegionResponse {
506            affected_rows,
507            extensions,
508            metadata: Vec::new(),
509        })
510    }
511}
512
513#[cfg(test)]
514impl MetricEngine {
515    pub async fn scan_to_stream(
516        &self,
517        region_id: RegionId,
518        request: ScanRequest,
519    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
520        self.inner.scan_to_stream(region_id, request).await
521    }
522
523    /// Returns the configuration of the engine.
524    pub fn config(&self) -> &EngineConfig {
525        &self.inner.config
526    }
527}
528
529struct MetricEngineInner {
530    mito: MitoEngine,
531    metadata_region: MetadataRegion,
532    data_region: DataRegion,
533    state: Arc<RwLock<MetricEngineState>>,
534    config: EngineConfig,
535    row_modifier: RowModifier,
536    flush_task: RepeatedTask<Error>,
537}
538
539#[cfg(test)]
540mod test {
541    use std::collections::HashMap;
542
543    use common_telemetry::info;
544    use common_wal::options::{KafkaWalOptions, WalOptions};
545    use mito2::sst::location::region_dir_from_table_dir;
546    use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store};
547    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
548    use store_api::mito_engine_options::WAL_OPTIONS_KEY;
549    use store_api::region_request::{
550        PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
551    };
552
553    use super::*;
554    use crate::maybe_skip_kafka_log_store_integration_test;
555    use crate::test_util::TestEnv;
556
557    #[tokio::test]
558    async fn close_open_regions() {
559        let env = TestEnv::new().await;
560        env.init_metric_region().await;
561        let engine = env.metric();
562
563        // close physical region
564        let physical_region_id = env.default_physical_region_id();
565        engine
566            .handle_request(
567                physical_region_id,
568                RegionRequest::Close(RegionCloseRequest {}),
569            )
570            .await
571            .unwrap();
572
573        // reopen physical region
574        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
575            .into_iter()
576            .collect();
577        let open_request = RegionOpenRequest {
578            engine: METRIC_ENGINE_NAME.to_string(),
579            table_dir: TestEnv::default_table_dir(),
580            path_type: PathType::Bare, // Use Bare path type for engine regions
581            options: physical_region_option,
582            skip_wal_replay: false,
583            checkpoint: None,
584        };
585        engine
586            .handle_request(physical_region_id, RegionRequest::Open(open_request))
587            .await
588            .unwrap();
589
590        // close nonexistent region won't report error
591        let nonexistent_region_id = RegionId::new(12313, 12);
592        engine
593            .handle_request(
594                nonexistent_region_id,
595                RegionRequest::Close(RegionCloseRequest {}),
596            )
597            .await
598            .unwrap();
599
600        // open nonexistent region won't report error
601        let invalid_open_request = RegionOpenRequest {
602            engine: METRIC_ENGINE_NAME.to_string(),
603            table_dir: TestEnv::default_table_dir(),
604            path_type: PathType::Bare, // Use Bare path type for engine regions
605            options: HashMap::new(),
606            skip_wal_replay: false,
607            checkpoint: None,
608        };
609        engine
610            .handle_request(
611                nonexistent_region_id,
612                RegionRequest::Open(invalid_open_request),
613            )
614            .await
615            .unwrap();
616    }
617
618    #[tokio::test]
619    async fn test_role() {
620        let env = TestEnv::new().await;
621        env.init_metric_region().await;
622
623        let logical_region_id = env.default_logical_region_id();
624        let physical_region_id = env.default_physical_region_id();
625
626        assert!(env.metric().role(logical_region_id).is_none());
627        assert!(env.metric().role(physical_region_id).is_some());
628    }
629
630    #[tokio::test]
631    async fn test_region_disk_usage() {
632        let env = TestEnv::new().await;
633        env.init_metric_region().await;
634
635        let logical_region_id = env.default_logical_region_id();
636        let physical_region_id = env.default_physical_region_id();
637
638        assert!(env.metric().region_statistic(logical_region_id).is_none());
639        assert!(env.metric().region_statistic(physical_region_id).is_some());
640    }
641
642    #[tokio::test]
643    async fn test_open_region_failure() {
644        let env = TestEnv::new().await;
645        env.init_metric_region().await;
646        let physical_region_id = env.default_physical_region_id();
647
648        let metric_engine = env.metric();
649        metric_engine
650            .handle_request(
651                physical_region_id,
652                RegionRequest::Flush(RegionFlushRequest {
653                    row_group_size: None,
654                }),
655            )
656            .await
657            .unwrap();
658
659        let path = region_dir_from_table_dir(
660            &TestEnv::default_table_dir(),
661            physical_region_id,
662            PathType::Metadata,
663        );
664        let object_store = env.get_object_store().unwrap();
665        let list = object_store.list(&path).await.unwrap();
666        // Delete parquet files in metadata region
667        for entry in list {
668            if entry.metadata().is_dir() {
669                continue;
670            }
671            if entry.name().ends_with("parquet") {
672                info!("deleting {}", entry.path());
673                object_store.delete(entry.path()).await.unwrap();
674            }
675        }
676
677        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
678            .into_iter()
679            .collect();
680        let open_request = RegionOpenRequest {
681            engine: METRIC_ENGINE_NAME.to_string(),
682            table_dir: TestEnv::default_table_dir(),
683            path_type: PathType::Bare,
684            options: physical_region_option,
685            skip_wal_replay: false,
686            checkpoint: None,
687        };
688        // Opening an already opened region should succeed.
689        // Since the region is already open, no metadata recovery operations will be performed.
690        metric_engine
691            .handle_request(physical_region_id, RegionRequest::Open(open_request))
692            .await
693            .unwrap();
694
695        // Close the region
696        metric_engine
697            .handle_request(
698                physical_region_id,
699                RegionRequest::Close(RegionCloseRequest {}),
700            )
701            .await
702            .unwrap();
703
704        // Try to reopen region.
705        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
706            .into_iter()
707            .collect();
708        let open_request = RegionOpenRequest {
709            engine: METRIC_ENGINE_NAME.to_string(),
710            table_dir: TestEnv::default_table_dir(),
711            path_type: PathType::Bare,
712            options: physical_region_option,
713            skip_wal_replay: false,
714            checkpoint: None,
715        };
716        let err = metric_engine
717            .handle_request(physical_region_id, RegionRequest::Open(open_request))
718            .await
719            .unwrap_err();
720        // Failed to open region because of missing parquet files.
721        assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
722
723        let mito_engine = metric_engine.mito();
724        let data_region_id = utils::to_data_region_id(physical_region_id);
725        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
726        // The metadata/data region should be closed.
727        let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
728        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
729        let err = mito_engine
730            .get_metadata(metadata_region_id)
731            .await
732            .unwrap_err();
733        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
734    }
735
736    #[tokio::test]
737    async fn test_catchup_regions() {
738        common_telemetry::init_default_ut_logging();
739        maybe_skip_kafka_log_store_integration_test!();
740        let kafka_log_store_factory = kafka_log_store_factory().unwrap();
741        let mito_env = mito2::test_util::TestEnv::new()
742            .await
743            .with_log_store_factory(kafka_log_store_factory.clone());
744        let env = TestEnv::with_mito_env(mito_env).await;
745        let table_dir = |region_id| format!("table/{region_id}");
746        let mut physical_region_ids = vec![];
747        let mut logical_region_ids = vec![];
748
749        let num_topics = 3;
750        let num_physical_regions = 8;
751        let num_logical_regions = 16;
752        let parallelism = 2;
753        let mut topics = Vec::with_capacity(num_topics);
754        for _ in 0..num_topics {
755            let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory)
756                .await
757                .unwrap();
758            topics.push(topic);
759        }
760
761        let topic_idx = |id| (id as usize) % num_topics;
762        // Creates physical regions
763        for i in 0..num_physical_regions {
764            let physical_region_id = RegionId::new(1, i);
765            physical_region_ids.push(physical_region_id);
766
767            let wal_options = WalOptions::Kafka(KafkaWalOptions {
768                topic: topics[topic_idx(i)].clone(),
769            });
770            env.create_physical_region(
771                physical_region_id,
772                &table_dir(physical_region_id),
773                vec![(
774                    WAL_OPTIONS_KEY.to_string(),
775                    serde_json::to_string(&wal_options).unwrap(),
776                )],
777            )
778            .await;
779            // Creates logical regions for each physical region
780            for j in 0..num_logical_regions {
781                let logical_region_id = RegionId::new(1024 + i, j);
782                logical_region_ids.push(logical_region_id);
783                env.create_logical_region(physical_region_id, logical_region_id)
784                    .await;
785            }
786        }
787
788        let metric_engine = env.metric();
789        // Closes all regions
790        for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) {
791            metric_engine
792                .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
793                .await
794                .unwrap();
795        }
796
797        // Opens all regions and skip the wal
798        let requests = physical_region_ids
799            .iter()
800            .enumerate()
801            .map(|(idx, region_id)| {
802                let mut options = HashMap::new();
803                let wal_options = WalOptions::Kafka(KafkaWalOptions {
804                    topic: topics[topic_idx(idx as u32)].clone(),
805                });
806                options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new());
807                options.insert(
808                    WAL_OPTIONS_KEY.to_string(),
809                    serde_json::to_string(&wal_options).unwrap(),
810                );
811                (
812                    *region_id,
813                    RegionOpenRequest {
814                        engine: METRIC_ENGINE_NAME.to_string(),
815                        table_dir: table_dir(*region_id),
816                        path_type: PathType::Bare,
817                        options: options.clone(),
818                        skip_wal_replay: true,
819                        checkpoint: None,
820                    },
821                )
822            })
823            .collect::<Vec<_>>();
824        info!("Open batch regions with parallelism: {parallelism}");
825        metric_engine
826            .handle_batch_open_requests(parallelism, requests)
827            .await
828            .unwrap();
829        {
830            let state = metric_engine.inner.state.read().unwrap();
831            for logical_region in &logical_region_ids {
832                assert!(!state.logical_regions().contains_key(logical_region));
833            }
834        }
835
836        let catch_requests = physical_region_ids
837            .iter()
838            .map(|region_id| {
839                (
840                    *region_id,
841                    RegionCatchupRequest {
842                        set_writable: true,
843                        ..Default::default()
844                    },
845                )
846            })
847            .collect::<Vec<_>>();
848        metric_engine
849            .handle_batch_catchup_requests(parallelism, catch_requests)
850            .await
851            .unwrap();
852        {
853            let state = metric_engine.inner.state.read().unwrap();
854            for logical_region in &logical_region_ids {
855                assert!(state.logical_regions().contains_key(logical_region));
856            }
857        }
858    }
859}