Skip to main content

metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod bulk_insert;
17mod catchup;
18mod close;
19mod create;
20mod drop;
21mod flush;
22mod open;
23mod options;
24mod put;
25mod read;
26mod region_metadata;
27mod staging;
28mod state;
29mod sync;
30
31use std::any::Any;
32use std::collections::HashMap;
33use std::sync::{Arc, RwLock};
34
35use api::region::RegionResponse;
36use async_trait::async_trait;
37use common_error::ext::{BoxedError, ErrorExt};
38use common_error::status_code::StatusCode;
39use common_runtime::RepeatedTask;
40use mito2::engine::MitoEngine;
41pub(crate) use options::IndexOptions;
42use snafu::{OptionExt, ResultExt};
43pub(crate) use state::MetricEngineState;
44use store_api::metadata::RegionMetadataRef;
45use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
46use store_api::region_engine::{
47    BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic,
48    RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
49    SetRegionRoleStateSuccess, SettableRegionRoleState, SyncRegionFromRequest,
50    SyncRegionFromResponse,
51};
52use store_api::region_request::{
53    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionPutRequest,
54    RegionRequest,
55};
56use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
57
58use crate::config::EngineConfig;
59use crate::data_region::DataRegion;
60use crate::error::{
61    self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu,
62    UnsupportedRemapManifestsRequestSnafu,
63};
64use crate::metadata_region::MetadataRegion;
65use crate::repeated_task::FlushMetadataRegionTask;
66use crate::row_modifier::RowModifier;
67use crate::utils::{self, get_region_statistic};
68
69#[cfg_attr(doc, aquamarine::aquamarine)]
70/// # Metric Engine
71///
72/// ## Regions
73///
74/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
75/// which refer to the region that actually stores the data. And `LogicalRegion`
76/// that is "simulated" over physical regions. Each logical region is associated
77/// with one physical region group, which is a group of two physical regions.
78/// Their relationship is illustrated below:
79///
80/// ```mermaid
81/// erDiagram
82///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
83///     PhysicalRegionGroup ||--|| DataRegion : contains
84///     PhysicalRegionGroup ||--|| MetadataRegion : contains
85/// ```
86///
87/// Metric engine uses two region groups. One is for data region
88/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
89/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
90/// From the definition of [`RegionId`], we can convert between these two physical
91/// region ids easily. Thus in the code base we usually refer to one "physical
92/// region id", and convert it to the other one when necessary.
93///
94/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
95/// storage or region group. Only a region id that is allocated by meta server.
96/// And all other things is shared with other logical region that are associated
97/// with the same physical region group.
98///
99/// For more document about physical regions, please refer to [`MetadataRegion`]
100/// and [`DataRegion`].
101///
102/// ## Operations
103///
104/// Both physical and logical region are accessible to user. But the operation
105/// they support are different. List below:
106///
107/// | Operations | Logical Region | Physical Region |
108/// | ---------- | -------------- | --------------- |
109/// |   Create   |       ✅        |        ✅        |
110/// |    Drop    |       ✅        |        ❓*       |
111/// |   Write    |       ✅        |        ❌        |
112/// |    Read    |       ✅        |        ✅        |
113/// |   Close    |       ✅        |        ✅        |
114/// |    Open    |       ✅        |        ✅        |
115/// |   Alter    |       ✅        |        ❓*       |
116///
117/// *: Physical region can be dropped only when all related logical regions are dropped.
118/// *: Alter: Physical regions only support altering region options.
119///
120/// ## Internal Columns
121///
122/// The physical data region contains two internal columns. Should
123/// mention that "internal" here is for metric engine itself. Mito
124/// engine will add it's internal columns to the region as well.
125///
126/// Their column id is registered in [`ReservedColumnId`]. And column name is
127/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
128///
129/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
130/// id to distinguish data from different logical tables.
131#[derive(Clone)]
132pub struct MetricEngine {
133    inner: Arc<MetricEngineInner>,
134}
135
136#[async_trait]
137impl RegionEngine for MetricEngine {
138    /// Name of this engine
139    fn name(&self) -> &str {
140        METRIC_ENGINE_NAME
141    }
142
143    async fn handle_batch_open_requests(
144        &self,
145        parallelism: usize,
146        requests: Vec<(RegionId, RegionOpenRequest)>,
147    ) -> Result<BatchResponses, BoxedError> {
148        self.inner
149            .handle_batch_open_requests(parallelism, requests)
150            .await
151            .map_err(BoxedError::new)
152    }
153
154    async fn handle_batch_catchup_requests(
155        &self,
156        parallelism: usize,
157        requests: Vec<(RegionId, RegionCatchupRequest)>,
158    ) -> Result<BatchResponses, BoxedError> {
159        self.inner
160            .handle_batch_catchup_requests(parallelism, requests)
161            .await
162            .map_err(BoxedError::new)
163    }
164
165    async fn handle_batch_ddl_requests(
166        &self,
167        batch_request: BatchRegionDdlRequest,
168    ) -> Result<RegionResponse, BoxedError> {
169        match batch_request {
170            BatchRegionDdlRequest::Create(requests) => {
171                let mut extension_return_value = HashMap::new();
172                let rows = self
173                    .inner
174                    .create_regions(requests, &mut extension_return_value)
175                    .await
176                    .map_err(BoxedError::new)?;
177
178                Ok(RegionResponse {
179                    affected_rows: rows,
180                    extensions: extension_return_value,
181                    metadata: Vec::new(),
182                })
183            }
184            BatchRegionDdlRequest::Alter(requests) => {
185                let mut extension_return_value = HashMap::new();
186                let rows = self
187                    .inner
188                    .alter_regions(requests, &mut extension_return_value)
189                    .await
190                    .map_err(BoxedError::new)?;
191
192                Ok(RegionResponse {
193                    affected_rows: rows,
194                    extensions: extension_return_value,
195                    metadata: Vec::new(),
196                })
197            }
198            BatchRegionDdlRequest::Drop(requests) => {
199                self.handle_requests(
200                    requests
201                        .into_iter()
202                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
203                )
204                .await
205            }
206        }
207    }
208
209    /// Handles non-query request to the region. Returns the count of affected rows.
210    async fn handle_request(
211        &self,
212        region_id: RegionId,
213        request: RegionRequest,
214    ) -> Result<RegionResponse, BoxedError> {
215        let mut extension_return_value = HashMap::new();
216
217        let result = match request {
218            RegionRequest::EnterStaging(_) => {
219                if self.inner.is_physical_region(region_id) {
220                    self.handle_enter_staging_request(region_id, request).await
221                } else {
222                    UnsupportedRegionRequestSnafu { request }.fail()
223                }
224            }
225            RegionRequest::ApplyStagingManifest(_) => {
226                if self.inner.is_physical_region(region_id) {
227                    return self.inner.mito.handle_request(region_id, request).await;
228                } else {
229                    UnsupportedRegionRequestSnafu { request }.fail()
230                }
231            }
232            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
233            RegionRequest::Create(create) => {
234                self.inner
235                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
236                    .await
237            }
238            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
239            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
240            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
241            RegionRequest::Alter(alter) => {
242                self.inner
243                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
244                    .await
245            }
246            RegionRequest::Compact(_) => {
247                if self.inner.is_physical_region(region_id) {
248                    self.inner
249                        .mito
250                        .handle_request(region_id, request)
251                        .await
252                        .context(error::MitoFlushOperationSnafu)
253                        .map(|response| response.affected_rows)
254                } else {
255                    UnsupportedRegionRequestSnafu { request }.fail()
256                }
257            }
258            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
259            RegionRequest::BuildIndex(_) => {
260                if self.inner.is_physical_region(region_id) {
261                    self.inner
262                        .mito
263                        .handle_request(region_id, request)
264                        .await
265                        .context(error::MitoFlushOperationSnafu)
266                        .map(|response| response.affected_rows)
267                } else {
268                    UnsupportedRegionRequestSnafu { request }.fail()
269                }
270            }
271            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
272            RegionRequest::Delete(delete) => self.inner.delete_region(region_id, delete).await,
273            RegionRequest::Catchup(_) => {
274                let mut response = self
275                    .inner
276                    .handle_batch_catchup_requests(
277                        1,
278                        vec![(region_id, RegionCatchupRequest::default())],
279                    )
280                    .await
281                    .map_err(BoxedError::new)?;
282                debug_assert_eq!(response.len(), 1);
283                let (resp_region_id, response) = response
284                    .pop()
285                    .context(error::UnexpectedRequestSnafu {
286                        reason: "expected 1 response, but got zero responses",
287                    })
288                    .map_err(BoxedError::new)?;
289                debug_assert_eq!(region_id, resp_region_id);
290                return response;
291            }
292            RegionRequest::BulkInserts(bulk) => {
293                self.inner.bulk_insert_region(region_id, bulk).await
294            }
295        };
296
297        result.map_err(BoxedError::new).map(|rows| RegionResponse {
298            affected_rows: rows,
299            extensions: extension_return_value,
300            metadata: Vec::new(),
301        })
302    }
303
304    async fn handle_query(
305        &self,
306        region_id: RegionId,
307        request: ScanRequest,
308    ) -> Result<RegionScannerRef, BoxedError> {
309        self.handle_query(region_id, request).await
310    }
311
312    async fn get_committed_sequence(
313        &self,
314        region_id: RegionId,
315    ) -> Result<SequenceNumber, BoxedError> {
316        self.inner
317            .get_last_seq_num(region_id)
318            .await
319            .map_err(BoxedError::new)
320    }
321
322    /// Retrieves region's metadata.
323    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
324        self.inner
325            .load_region_metadata(region_id)
326            .await
327            .map_err(BoxedError::new)
328    }
329
330    /// Retrieves region's disk usage.
331    ///
332    /// Note: Returns `None` if it's a logical region.
333    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
334        if self.inner.is_physical_region(region_id) {
335            get_region_statistic(&self.inner.mito, region_id)
336        } else {
337            None
338        }
339    }
340
341    /// Stops the engine
342    async fn stop(&self) -> Result<(), BoxedError> {
343        // don't need to stop the underlying mito engine
344        Ok(())
345    }
346
347    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
348        // ignore the region not found error
349        for x in [
350            utils::to_metadata_region_id(region_id),
351            utils::to_data_region_id(region_id),
352        ] {
353            if let Err(e) = self.inner.mito.set_region_role(x, role)
354                && e.status_code() != StatusCode::RegionNotFound
355            {
356                return Err(e);
357            }
358        }
359        Ok(())
360    }
361
362    async fn sync_region(
363        &self,
364        region_id: RegionId,
365        request: SyncRegionFromRequest,
366    ) -> Result<SyncRegionFromResponse, BoxedError> {
367        match request {
368            SyncRegionFromRequest::FromManifest(manifest_info) => self
369                .inner
370                .sync_region_from_manifest(region_id, manifest_info)
371                .await
372                .map_err(BoxedError::new),
373            SyncRegionFromRequest::FromRegion {
374                source_region_id,
375                parallelism,
376            } => {
377                if self.inner.is_physical_region(region_id) {
378                    self.inner
379                        .sync_region_from_region(region_id, source_region_id, parallelism)
380                        .await
381                        .map_err(BoxedError::new)
382                } else {
383                    Err(BoxedError::new(
384                        error::UnsupportedSyncRegionFromRequestSnafu { region_id }.build(),
385                    ))
386                }
387            }
388        }
389    }
390
391    async fn remap_manifests(
392        &self,
393        request: RemapManifestsRequest,
394    ) -> Result<RemapManifestsResponse, BoxedError> {
395        let region_id = request.region_id;
396        if self.inner.is_physical_region(region_id) {
397            self.inner.mito.remap_manifests(request).await
398        } else {
399            Err(BoxedError::new(
400                UnsupportedRemapManifestsRequestSnafu { region_id }.build(),
401            ))
402        }
403    }
404
405    async fn set_region_role_state_gracefully(
406        &self,
407        region_id: RegionId,
408        region_role_state: SettableRegionRoleState,
409    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
410        let metadata_result = match self
411            .inner
412            .mito
413            .set_region_role_state_gracefully(
414                utils::to_metadata_region_id(region_id),
415                region_role_state,
416            )
417            .await?
418        {
419            SetRegionRoleStateResponse::Success(success) => success,
420            SetRegionRoleStateResponse::NotFound => {
421                return Ok(SetRegionRoleStateResponse::NotFound);
422            }
423            SetRegionRoleStateResponse::InvalidTransition(error) => {
424                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
425            }
426        };
427
428        let data_result = match self
429            .inner
430            .mito
431            .set_region_role_state_gracefully(region_id, region_role_state)
432            .await?
433        {
434            SetRegionRoleStateResponse::Success(success) => success,
435            SetRegionRoleStateResponse::NotFound => {
436                return Ok(SetRegionRoleStateResponse::NotFound);
437            }
438            SetRegionRoleStateResponse::InvalidTransition(error) => {
439                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
440            }
441        };
442
443        Ok(SetRegionRoleStateResponse::success(
444            SetRegionRoleStateSuccess::metric(
445                data_result.last_entry_id().unwrap_or_default(),
446                metadata_result.last_entry_id().unwrap_or_default(),
447            ),
448        ))
449    }
450
451    /// Returns the physical region role.
452    ///
453    /// Note: Returns `None` if it's a logical region.
454    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
455        if self.inner.is_physical_region(region_id) {
456            self.inner.mito.role(region_id)
457        } else {
458            None
459        }
460    }
461
462    fn as_any(&self) -> &dyn Any {
463        self
464    }
465}
466
467impl MetricEngine {
468    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
469        let metadata_region = MetadataRegion::new(mito.clone());
470        let data_region = DataRegion::new(mito.clone());
471        let state = Arc::new(RwLock::default());
472        config.sanitize();
473        let flush_interval = config.flush_metadata_region_interval;
474        let inner = Arc::new(MetricEngineInner {
475            mito: mito.clone(),
476            metadata_region,
477            data_region,
478            state: state.clone(),
479            config,
480            row_modifier: RowModifier::default(),
481            flush_task: RepeatedTask::new(
482                flush_interval,
483                Box::new(FlushMetadataRegionTask {
484                    state: state.clone(),
485                    mito: mito.clone(),
486                }),
487            ),
488        });
489        inner
490            .flush_task
491            .start(common_runtime::global_runtime())
492            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
493        Ok(Self { inner })
494    }
495
496    pub fn mito(&self) -> MitoEngine {
497        self.inner.mito.clone()
498    }
499
500    /// Batch put operation for multiple logical regions.
501    /// Requests are grouped by physical region; a failure can leave earlier
502    /// physical-region groups committed.
503    pub async fn put_regions_batch(
504        &self,
505        requests: impl ExactSizeIterator<Item = (RegionId, RegionPutRequest)>,
506    ) -> Result<AffectedRows> {
507        self.inner.put_regions_batch(requests).await
508    }
509
510    /// Returns all logical regions associated with the physical region.
511    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
512        self.inner
513            .metadata_region
514            .logical_regions(physical_region_id)
515            .await
516    }
517
518    /// Handles substrait query and return a stream of record batches
519    async fn handle_query(
520        &self,
521        region_id: RegionId,
522        request: ScanRequest,
523    ) -> Result<RegionScannerRef, BoxedError> {
524        self.inner
525            .read_region(region_id, request)
526            .await
527            .map_err(BoxedError::new)
528    }
529
530    async fn handle_requests(
531        &self,
532        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
533    ) -> Result<RegionResponse, BoxedError> {
534        let mut affected_rows = 0;
535        let mut extensions = HashMap::new();
536        for (region_id, request) in requests {
537            let response = self.handle_request(region_id, request).await?;
538            affected_rows += response.affected_rows;
539            extensions.extend(response.extensions);
540        }
541
542        Ok(RegionResponse {
543            affected_rows,
544            extensions,
545            metadata: Vec::new(),
546        })
547    }
548}
549
550#[cfg(test)]
551impl MetricEngine {
552    pub async fn scan_to_stream(
553        &self,
554        region_id: RegionId,
555        request: ScanRequest,
556    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
557        self.inner.scan_to_stream(region_id, request).await
558    }
559
560    /// Returns the configuration of the engine.
561    pub fn config(&self) -> &EngineConfig {
562        &self.inner.config
563    }
564}
565
566struct MetricEngineInner {
567    mito: MitoEngine,
568    metadata_region: MetadataRegion,
569    data_region: DataRegion,
570    state: Arc<RwLock<MetricEngineState>>,
571    config: EngineConfig,
572    row_modifier: RowModifier,
573    flush_task: RepeatedTask<Error>,
574}
575
576#[cfg(test)]
577mod test {
578    use std::assert_matches;
579    use std::collections::HashMap;
580
581    use common_telemetry::info;
582    use common_wal::options::{KafkaWalOptions, WalOptions};
583    use mito2::sst::location::region_dir_from_table_dir;
584    use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store};
585    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
586    use store_api::mito_engine_options::WAL_OPTIONS_KEY;
587    use store_api::region_request::{
588        PathType, RegionCloseRequest, RegionDropRequest, RegionFlushRequest, RegionOpenRequest,
589        RegionRequest,
590    };
591
592    use super::*;
593    use crate::maybe_skip_kafka_log_store_integration_test;
594    use crate::test_util::{TestEnv, create_logical_region_request};
595
596    #[tokio::test]
597    async fn close_open_regions() {
598        let env = TestEnv::new().await;
599        env.init_metric_region().await;
600        let engine = env.metric();
601
602        // close physical region
603        let physical_region_id = env.default_physical_region_id();
604        engine
605            .handle_request(
606                physical_region_id,
607                RegionRequest::Close(RegionCloseRequest {}),
608            )
609            .await
610            .unwrap();
611
612        // reopen physical region
613        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
614            .into_iter()
615            .collect();
616        let open_request = RegionOpenRequest {
617            engine: METRIC_ENGINE_NAME.to_string(),
618            table_dir: TestEnv::default_table_dir(),
619            path_type: PathType::Bare, // Use Bare path type for engine regions
620            options: physical_region_option,
621            skip_wal_replay: false,
622            checkpoint: None,
623        };
624        engine
625            .handle_request(physical_region_id, RegionRequest::Open(open_request))
626            .await
627            .unwrap();
628
629        // close nonexistent region won't report error
630        let nonexistent_region_id = RegionId::new(12313, 12);
631        engine
632            .handle_request(
633                nonexistent_region_id,
634                RegionRequest::Close(RegionCloseRequest {}),
635            )
636            .await
637            .unwrap();
638
639        // open nonexistent region won't report error
640        let invalid_open_request = RegionOpenRequest {
641            engine: METRIC_ENGINE_NAME.to_string(),
642            table_dir: TestEnv::default_table_dir(),
643            path_type: PathType::Bare, // Use Bare path type for engine regions
644            options: HashMap::new(),
645            skip_wal_replay: false,
646            checkpoint: None,
647        };
648        engine
649            .handle_request(
650                nonexistent_region_id,
651                RegionRequest::Open(invalid_open_request),
652            )
653            .await
654            .unwrap();
655    }
656
657    #[tokio::test]
658    async fn test_role() {
659        let env = TestEnv::new().await;
660        env.init_metric_region().await;
661
662        let logical_region_id = env.default_logical_region_id();
663        let physical_region_id = env.default_physical_region_id();
664
665        assert!(env.metric().role(logical_region_id).is_none());
666        assert!(env.metric().role(physical_region_id).is_some());
667    }
668
669    #[tokio::test]
670    async fn test_region_disk_usage() {
671        let env = TestEnv::new().await;
672        env.init_metric_region().await;
673
674        let logical_region_id = env.default_logical_region_id();
675        let physical_region_id = env.default_physical_region_id();
676
677        assert!(env.metric().region_statistic(logical_region_id).is_none());
678        assert!(env.metric().region_statistic(physical_region_id).is_some());
679    }
680
681    #[tokio::test]
682    async fn test_open_region_failure() {
683        let env = TestEnv::new().await;
684        env.init_metric_region().await;
685        let physical_region_id = env.default_physical_region_id();
686
687        let metric_engine = env.metric();
688        metric_engine
689            .handle_request(
690                physical_region_id,
691                RegionRequest::Flush(RegionFlushRequest {
692                    row_group_size: None,
693                }),
694            )
695            .await
696            .unwrap();
697
698        let path = region_dir_from_table_dir(
699            &TestEnv::default_table_dir(),
700            physical_region_id,
701            PathType::Metadata,
702        );
703        let object_store = env.get_object_store().unwrap();
704        let list = object_store.list(&path).await.unwrap();
705        // Delete parquet files in metadata region
706        for entry in list {
707            if entry.metadata().is_dir() {
708                continue;
709            }
710            if entry.name().ends_with("parquet") {
711                info!("deleting {}", entry.path());
712                object_store.delete(entry.path()).await.unwrap();
713            }
714        }
715
716        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
717            .into_iter()
718            .collect();
719        let open_request = RegionOpenRequest {
720            engine: METRIC_ENGINE_NAME.to_string(),
721            table_dir: TestEnv::default_table_dir(),
722            path_type: PathType::Bare,
723            options: physical_region_option,
724            skip_wal_replay: false,
725            checkpoint: None,
726        };
727        // Opening an already opened region should succeed.
728        // Since the region is already open, no metadata recovery operations will be performed.
729        metric_engine
730            .handle_request(physical_region_id, RegionRequest::Open(open_request))
731            .await
732            .unwrap();
733
734        // Close the region
735        metric_engine
736            .handle_request(
737                physical_region_id,
738                RegionRequest::Close(RegionCloseRequest {}),
739            )
740            .await
741            .unwrap();
742
743        // Try to reopen region.
744        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
745            .into_iter()
746            .collect();
747        let open_request = RegionOpenRequest {
748            engine: METRIC_ENGINE_NAME.to_string(),
749            table_dir: TestEnv::default_table_dir(),
750            path_type: PathType::Bare,
751            options: physical_region_option,
752            skip_wal_replay: false,
753            checkpoint: None,
754        };
755        let err = metric_engine
756            .handle_request(physical_region_id, RegionRequest::Open(open_request))
757            .await
758            .unwrap_err();
759        // Failed to open region because of missing parquet files.
760        assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
761
762        let mito_engine = metric_engine.mito();
763        let data_region_id = utils::to_data_region_id(physical_region_id);
764        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
765        // The metadata/data region should be closed.
766        let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
767        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
768        let err = mito_engine
769            .get_metadata(metadata_region_id)
770            .await
771            .unwrap_err();
772        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
773    }
774
775    #[tokio::test]
776    async fn test_catchup_regions() {
777        common_telemetry::init_default_ut_logging();
778        maybe_skip_kafka_log_store_integration_test!();
779        let kafka_log_store_factory = kafka_log_store_factory().unwrap();
780        let mito_env = mito2::test_util::TestEnv::new()
781            .await
782            .with_log_store_factory(kafka_log_store_factory.clone());
783        let env = TestEnv::with_mito_env(mito_env).await;
784        let table_dir = |region_id| format!("table/{region_id}");
785        let mut physical_region_ids = vec![];
786        let mut logical_region_ids = vec![];
787
788        let num_topics = 3;
789        let num_physical_regions = 8;
790        let num_logical_regions = 16;
791        let parallelism = 2;
792        let mut topics = Vec::with_capacity(num_topics);
793        for _ in 0..num_topics {
794            let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory)
795                .await
796                .unwrap();
797            topics.push(topic);
798        }
799
800        let topic_idx = |id| (id as usize) % num_topics;
801        // Creates physical regions
802        for i in 0..num_physical_regions {
803            let physical_region_id = RegionId::new(1, i);
804            physical_region_ids.push(physical_region_id);
805
806            let wal_options = WalOptions::Kafka(KafkaWalOptions {
807                topic: topics[topic_idx(i)].clone(),
808            });
809            env.create_physical_region(
810                physical_region_id,
811                &table_dir(physical_region_id),
812                vec![(
813                    WAL_OPTIONS_KEY.to_string(),
814                    serde_json::to_string(&wal_options).unwrap(),
815                )],
816            )
817            .await;
818            // Creates logical regions for each physical region
819            for j in 0..num_logical_regions {
820                let logical_region_id = RegionId::new(1024 + i, j);
821                logical_region_ids.push(logical_region_id);
822                env.create_logical_region(physical_region_id, logical_region_id)
823                    .await;
824            }
825        }
826
827        let metric_engine = env.metric();
828        // Closes all regions
829        for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) {
830            metric_engine
831                .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
832                .await
833                .unwrap();
834        }
835
836        // Opens all regions and skip the wal
837        let requests = physical_region_ids
838            .iter()
839            .enumerate()
840            .map(|(idx, region_id)| {
841                let mut options = HashMap::new();
842                let wal_options = WalOptions::Kafka(KafkaWalOptions {
843                    topic: topics[topic_idx(idx as u32)].clone(),
844                });
845                options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new());
846                options.insert(
847                    WAL_OPTIONS_KEY.to_string(),
848                    serde_json::to_string(&wal_options).unwrap(),
849                );
850                (
851                    *region_id,
852                    RegionOpenRequest {
853                        engine: METRIC_ENGINE_NAME.to_string(),
854                        table_dir: table_dir(*region_id),
855                        path_type: PathType::Bare,
856                        options: options.clone(),
857                        skip_wal_replay: true,
858                        checkpoint: None,
859                    },
860                )
861            })
862            .collect::<Vec<_>>();
863        info!("Open batch regions with parallelism: {parallelism}");
864        metric_engine
865            .handle_batch_open_requests(parallelism, requests)
866            .await
867            .unwrap();
868        {
869            let state = metric_engine.inner.state.read().unwrap();
870            for logical_region in &logical_region_ids {
871                assert!(!state.logical_regions().contains_key(logical_region));
872            }
873        }
874
875        let catch_requests = physical_region_ids
876            .iter()
877            .map(|region_id| {
878                (
879                    *region_id,
880                    RegionCatchupRequest {
881                        set_writable: true,
882                        ..Default::default()
883                    },
884                )
885            })
886            .collect::<Vec<_>>();
887        metric_engine
888            .handle_batch_catchup_requests(parallelism, catch_requests)
889            .await
890            .unwrap();
891        {
892            let state = metric_engine.inner.state.read().unwrap();
893            for logical_region in &logical_region_ids {
894                assert!(state.logical_regions().contains_key(logical_region));
895            }
896        }
897    }
898
899    #[tokio::test]
900    async fn test_drop_region() {
901        let env = TestEnv::new().await;
902        let engine = env.metric();
903        let physical_region_id1 = RegionId::new(1024, 0);
904        let logical_region_id1 = RegionId::new(1025, 0);
905        env.create_physical_region(physical_region_id1, "/test_dir1", vec![])
906            .await;
907        let region_create_request1 =
908            create_logical_region_request(&["job"], physical_region_id1, "logical1");
909        engine
910            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![(
911                logical_region_id1,
912                region_create_request1,
913            )]))
914            .await
915            .unwrap();
916        let err = engine
917            .handle_request(
918                physical_region_id1,
919                RegionRequest::Drop(RegionDropRequest {
920                    fast_path: false,
921                    force: false,
922                    partial_drop: false,
923                }),
924            )
925            .await
926            .unwrap_err();
927        assert_matches!(
928            err.as_any().downcast_ref::<Error>().unwrap(),
929            &Error::PhysicalRegionBusy { .. }
930        );
931
932        engine
933            .handle_request(
934                physical_region_id1,
935                RegionRequest::Drop(RegionDropRequest {
936                    fast_path: false,
937                    force: true,
938                    partial_drop: false,
939                }),
940            )
941            .await
942            .unwrap();
943        assert!(
944            engine
945                .inner
946                .state
947                .read()
948                .unwrap()
949                .physical_region_states()
950                .get(&physical_region_id1)
951                .is_none()
952        );
953        assert!(
954            engine
955                .inner
956                .state
957                .read()
958                .unwrap()
959                .logical_regions()
960                .get(&logical_region_id1)
961                .is_none()
962        );
963    }
964}