1mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod staging;
27mod state;
28mod sync;
29
30use std::any::Any;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33
34use api::region::RegionResponse;
35use async_trait::async_trait;
36use common_error::ext::{BoxedError, ErrorExt};
37use common_error::status_code::StatusCode;
38use common_runtime::RepeatedTask;
39use mito2::engine::MitoEngine;
40pub(crate) use options::IndexOptions;
41use snafu::{OptionExt, ResultExt};
42pub(crate) use state::MetricEngineState;
43use store_api::metadata::RegionMetadataRef;
44use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
45use store_api::region_engine::{
46 BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic,
47 RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
48 SetRegionRoleStateSuccess, SettableRegionRoleState, SyncRegionFromRequest,
49 SyncRegionFromResponse,
50};
51use store_api::region_request::{
52 BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
53};
54use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
55
56use crate::config::EngineConfig;
57use crate::data_region::DataRegion;
58use crate::error::{
59 self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu,
60 UnsupportedRemapManifestsRequestSnafu,
61};
62use crate::metadata_region::MetadataRegion;
63use crate::repeated_task::FlushMetadataRegionTask;
64use crate::row_modifier::RowModifier;
65use crate::utils::{self, get_region_statistic};
66
67#[cfg_attr(doc, aquamarine::aquamarine)]
68#[derive(Clone)]
130pub struct MetricEngine {
131 inner: Arc<MetricEngineInner>,
132}
133
134#[async_trait]
135impl RegionEngine for MetricEngine {
136 fn name(&self) -> &str {
138 METRIC_ENGINE_NAME
139 }
140
141 async fn handle_batch_open_requests(
142 &self,
143 parallelism: usize,
144 requests: Vec<(RegionId, RegionOpenRequest)>,
145 ) -> Result<BatchResponses, BoxedError> {
146 self.inner
147 .handle_batch_open_requests(parallelism, requests)
148 .await
149 .map_err(BoxedError::new)
150 }
151
152 async fn handle_batch_catchup_requests(
153 &self,
154 parallelism: usize,
155 requests: Vec<(RegionId, RegionCatchupRequest)>,
156 ) -> Result<BatchResponses, BoxedError> {
157 self.inner
158 .handle_batch_catchup_requests(parallelism, requests)
159 .await
160 .map_err(BoxedError::new)
161 }
162
163 async fn handle_batch_ddl_requests(
164 &self,
165 batch_request: BatchRegionDdlRequest,
166 ) -> Result<RegionResponse, BoxedError> {
167 match batch_request {
168 BatchRegionDdlRequest::Create(requests) => {
169 let mut extension_return_value = HashMap::new();
170 let rows = self
171 .inner
172 .create_regions(requests, &mut extension_return_value)
173 .await
174 .map_err(BoxedError::new)?;
175
176 Ok(RegionResponse {
177 affected_rows: rows,
178 extensions: extension_return_value,
179 metadata: Vec::new(),
180 })
181 }
182 BatchRegionDdlRequest::Alter(requests) => {
183 let mut extension_return_value = HashMap::new();
184 let rows = self
185 .inner
186 .alter_regions(requests, &mut extension_return_value)
187 .await
188 .map_err(BoxedError::new)?;
189
190 Ok(RegionResponse {
191 affected_rows: rows,
192 extensions: extension_return_value,
193 metadata: Vec::new(),
194 })
195 }
196 BatchRegionDdlRequest::Drop(requests) => {
197 self.handle_requests(
198 requests
199 .into_iter()
200 .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
201 )
202 .await
203 }
204 }
205 }
206
207 async fn handle_request(
209 &self,
210 region_id: RegionId,
211 request: RegionRequest,
212 ) -> Result<RegionResponse, BoxedError> {
213 let mut extension_return_value = HashMap::new();
214
215 let result = match request {
216 RegionRequest::EnterStaging(_) => {
217 if self.inner.is_physical_region(region_id) {
218 self.handle_enter_staging_request(region_id, request).await
219 } else {
220 UnsupportedRegionRequestSnafu { request }.fail()
221 }
222 }
223 RegionRequest::ApplyStagingManifest(_) => {
224 if self.inner.is_physical_region(region_id) {
225 return self.inner.mito.handle_request(region_id, request).await;
226 } else {
227 UnsupportedRegionRequestSnafu { request }.fail()
228 }
229 }
230 RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
231 RegionRequest::Create(create) => {
232 self.inner
233 .create_regions(vec![(region_id, create)], &mut extension_return_value)
234 .await
235 }
236 RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
237 RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
238 RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
239 RegionRequest::Alter(alter) => {
240 self.inner
241 .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
242 .await
243 }
244 RegionRequest::Compact(_) => {
245 if self.inner.is_physical_region(region_id) {
246 self.inner
247 .mito
248 .handle_request(region_id, request)
249 .await
250 .context(error::MitoFlushOperationSnafu)
251 .map(|response| response.affected_rows)
252 } else {
253 UnsupportedRegionRequestSnafu { request }.fail()
254 }
255 }
256 RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
257 RegionRequest::BuildIndex(_) => {
258 if self.inner.is_physical_region(region_id) {
259 self.inner
260 .mito
261 .handle_request(region_id, request)
262 .await
263 .context(error::MitoFlushOperationSnafu)
264 .map(|response| response.affected_rows)
265 } else {
266 UnsupportedRegionRequestSnafu { request }.fail()
267 }
268 }
269 RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
270 RegionRequest::Delete(delete) => self.inner.delete_region(region_id, delete).await,
271 RegionRequest::Catchup(_) => {
272 let mut response = self
273 .inner
274 .handle_batch_catchup_requests(
275 1,
276 vec![(region_id, RegionCatchupRequest::default())],
277 )
278 .await
279 .map_err(BoxedError::new)?;
280 debug_assert_eq!(response.len(), 1);
281 let (resp_region_id, response) = response
282 .pop()
283 .context(error::UnexpectedRequestSnafu {
284 reason: "expected 1 response, but got zero responses",
285 })
286 .map_err(BoxedError::new)?;
287 debug_assert_eq!(region_id, resp_region_id);
288 return response;
289 }
290 RegionRequest::BulkInserts(_) => {
291 UnsupportedRegionRequestSnafu { request }.fail()
293 }
294 };
295
296 result.map_err(BoxedError::new).map(|rows| RegionResponse {
297 affected_rows: rows,
298 extensions: extension_return_value,
299 metadata: Vec::new(),
300 })
301 }
302
303 async fn handle_query(
304 &self,
305 region_id: RegionId,
306 request: ScanRequest,
307 ) -> Result<RegionScannerRef, BoxedError> {
308 self.handle_query(region_id, request).await
309 }
310
311 async fn get_committed_sequence(
312 &self,
313 region_id: RegionId,
314 ) -> Result<SequenceNumber, BoxedError> {
315 self.inner
316 .get_last_seq_num(region_id)
317 .await
318 .map_err(BoxedError::new)
319 }
320
321 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
323 self.inner
324 .load_region_metadata(region_id)
325 .await
326 .map_err(BoxedError::new)
327 }
328
329 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
333 if self.inner.is_physical_region(region_id) {
334 get_region_statistic(&self.inner.mito, region_id)
335 } else {
336 None
337 }
338 }
339
340 async fn stop(&self) -> Result<(), BoxedError> {
342 Ok(())
344 }
345
346 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
347 for x in [
349 utils::to_metadata_region_id(region_id),
350 utils::to_data_region_id(region_id),
351 ] {
352 if let Err(e) = self.inner.mito.set_region_role(x, role)
353 && e.status_code() != StatusCode::RegionNotFound
354 {
355 return Err(e);
356 }
357 }
358 Ok(())
359 }
360
361 async fn sync_region(
362 &self,
363 region_id: RegionId,
364 request: SyncRegionFromRequest,
365 ) -> Result<SyncRegionFromResponse, BoxedError> {
366 match request {
367 SyncRegionFromRequest::FromManifest(manifest_info) => self
368 .inner
369 .sync_region_from_manifest(region_id, manifest_info)
370 .await
371 .map_err(BoxedError::new),
372 SyncRegionFromRequest::FromRegion {
373 source_region_id,
374 parallelism,
375 } => {
376 if self.inner.is_physical_region(region_id) {
377 self.inner
378 .sync_region_from_region(region_id, source_region_id, parallelism)
379 .await
380 .map_err(BoxedError::new)
381 } else {
382 Err(BoxedError::new(
383 error::UnsupportedSyncRegionFromRequestSnafu { region_id }.build(),
384 ))
385 }
386 }
387 }
388 }
389
390 async fn remap_manifests(
391 &self,
392 request: RemapManifestsRequest,
393 ) -> Result<RemapManifestsResponse, BoxedError> {
394 let region_id = request.region_id;
395 if self.inner.is_physical_region(region_id) {
396 self.inner.mito.remap_manifests(request).await
397 } else {
398 Err(BoxedError::new(
399 UnsupportedRemapManifestsRequestSnafu { region_id }.build(),
400 ))
401 }
402 }
403
404 async fn set_region_role_state_gracefully(
405 &self,
406 region_id: RegionId,
407 region_role_state: SettableRegionRoleState,
408 ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
409 let metadata_result = match self
410 .inner
411 .mito
412 .set_region_role_state_gracefully(
413 utils::to_metadata_region_id(region_id),
414 region_role_state,
415 )
416 .await?
417 {
418 SetRegionRoleStateResponse::Success(success) => success,
419 SetRegionRoleStateResponse::NotFound => {
420 return Ok(SetRegionRoleStateResponse::NotFound);
421 }
422 SetRegionRoleStateResponse::InvalidTransition(error) => {
423 return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
424 }
425 };
426
427 let data_result = match self
428 .inner
429 .mito
430 .set_region_role_state_gracefully(region_id, region_role_state)
431 .await?
432 {
433 SetRegionRoleStateResponse::Success(success) => success,
434 SetRegionRoleStateResponse::NotFound => {
435 return Ok(SetRegionRoleStateResponse::NotFound);
436 }
437 SetRegionRoleStateResponse::InvalidTransition(error) => {
438 return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
439 }
440 };
441
442 Ok(SetRegionRoleStateResponse::success(
443 SetRegionRoleStateSuccess::metric(
444 data_result.last_entry_id().unwrap_or_default(),
445 metadata_result.last_entry_id().unwrap_or_default(),
446 ),
447 ))
448 }
449
450 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
454 if self.inner.is_physical_region(region_id) {
455 self.inner.mito.role(region_id)
456 } else {
457 None
458 }
459 }
460
461 fn as_any(&self) -> &dyn Any {
462 self
463 }
464}
465
466impl MetricEngine {
467 pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
468 let metadata_region = MetadataRegion::new(mito.clone());
469 let data_region = DataRegion::new(mito.clone());
470 let state = Arc::new(RwLock::default());
471 config.sanitize();
472 let flush_interval = config.flush_metadata_region_interval;
473 let inner = Arc::new(MetricEngineInner {
474 mito: mito.clone(),
475 metadata_region,
476 data_region,
477 state: state.clone(),
478 config,
479 row_modifier: RowModifier::default(),
480 flush_task: RepeatedTask::new(
481 flush_interval,
482 Box::new(FlushMetadataRegionTask {
483 state: state.clone(),
484 mito: mito.clone(),
485 }),
486 ),
487 });
488 inner
489 .flush_task
490 .start(common_runtime::global_runtime())
491 .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
492 Ok(Self { inner })
493 }
494
495 pub fn mito(&self) -> MitoEngine {
496 self.inner.mito.clone()
497 }
498
499 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
501 self.inner
502 .metadata_region
503 .logical_regions(physical_region_id)
504 .await
505 }
506
507 async fn handle_query(
509 &self,
510 region_id: RegionId,
511 request: ScanRequest,
512 ) -> Result<RegionScannerRef, BoxedError> {
513 self.inner
514 .read_region(region_id, request)
515 .await
516 .map_err(BoxedError::new)
517 }
518
519 async fn handle_requests(
520 &self,
521 requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
522 ) -> Result<RegionResponse, BoxedError> {
523 let mut affected_rows = 0;
524 let mut extensions = HashMap::new();
525 for (region_id, request) in requests {
526 let response = self.handle_request(region_id, request).await?;
527 affected_rows += response.affected_rows;
528 extensions.extend(response.extensions);
529 }
530
531 Ok(RegionResponse {
532 affected_rows,
533 extensions,
534 metadata: Vec::new(),
535 })
536 }
537}
538
539#[cfg(test)]
540impl MetricEngine {
541 pub async fn scan_to_stream(
542 &self,
543 region_id: RegionId,
544 request: ScanRequest,
545 ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
546 self.inner.scan_to_stream(region_id, request).await
547 }
548
549 pub fn config(&self) -> &EngineConfig {
551 &self.inner.config
552 }
553}
554
555struct MetricEngineInner {
556 mito: MitoEngine,
557 metadata_region: MetadataRegion,
558 data_region: DataRegion,
559 state: Arc<RwLock<MetricEngineState>>,
560 config: EngineConfig,
561 row_modifier: RowModifier,
562 flush_task: RepeatedTask<Error>,
563}
564
565#[cfg(test)]
566mod test {
567 use std::collections::HashMap;
568
569 use common_telemetry::info;
570 use common_wal::options::{KafkaWalOptions, WalOptions};
571 use mito2::sst::location::region_dir_from_table_dir;
572 use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store};
573 use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
574 use store_api::mito_engine_options::WAL_OPTIONS_KEY;
575 use store_api::region_request::{
576 PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
577 };
578
579 use super::*;
580 use crate::maybe_skip_kafka_log_store_integration_test;
581 use crate::test_util::TestEnv;
582
583 #[tokio::test]
584 async fn close_open_regions() {
585 let env = TestEnv::new().await;
586 env.init_metric_region().await;
587 let engine = env.metric();
588
589 let physical_region_id = env.default_physical_region_id();
591 engine
592 .handle_request(
593 physical_region_id,
594 RegionRequest::Close(RegionCloseRequest {}),
595 )
596 .await
597 .unwrap();
598
599 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
601 .into_iter()
602 .collect();
603 let open_request = RegionOpenRequest {
604 engine: METRIC_ENGINE_NAME.to_string(),
605 table_dir: TestEnv::default_table_dir(),
606 path_type: PathType::Bare, options: physical_region_option,
608 skip_wal_replay: false,
609 checkpoint: None,
610 };
611 engine
612 .handle_request(physical_region_id, RegionRequest::Open(open_request))
613 .await
614 .unwrap();
615
616 let nonexistent_region_id = RegionId::new(12313, 12);
618 engine
619 .handle_request(
620 nonexistent_region_id,
621 RegionRequest::Close(RegionCloseRequest {}),
622 )
623 .await
624 .unwrap();
625
626 let invalid_open_request = RegionOpenRequest {
628 engine: METRIC_ENGINE_NAME.to_string(),
629 table_dir: TestEnv::default_table_dir(),
630 path_type: PathType::Bare, options: HashMap::new(),
632 skip_wal_replay: false,
633 checkpoint: None,
634 };
635 engine
636 .handle_request(
637 nonexistent_region_id,
638 RegionRequest::Open(invalid_open_request),
639 )
640 .await
641 .unwrap();
642 }
643
644 #[tokio::test]
645 async fn test_role() {
646 let env = TestEnv::new().await;
647 env.init_metric_region().await;
648
649 let logical_region_id = env.default_logical_region_id();
650 let physical_region_id = env.default_physical_region_id();
651
652 assert!(env.metric().role(logical_region_id).is_none());
653 assert!(env.metric().role(physical_region_id).is_some());
654 }
655
656 #[tokio::test]
657 async fn test_region_disk_usage() {
658 let env = TestEnv::new().await;
659 env.init_metric_region().await;
660
661 let logical_region_id = env.default_logical_region_id();
662 let physical_region_id = env.default_physical_region_id();
663
664 assert!(env.metric().region_statistic(logical_region_id).is_none());
665 assert!(env.metric().region_statistic(physical_region_id).is_some());
666 }
667
668 #[tokio::test]
669 async fn test_open_region_failure() {
670 let env = TestEnv::new().await;
671 env.init_metric_region().await;
672 let physical_region_id = env.default_physical_region_id();
673
674 let metric_engine = env.metric();
675 metric_engine
676 .handle_request(
677 physical_region_id,
678 RegionRequest::Flush(RegionFlushRequest {
679 row_group_size: None,
680 }),
681 )
682 .await
683 .unwrap();
684
685 let path = region_dir_from_table_dir(
686 &TestEnv::default_table_dir(),
687 physical_region_id,
688 PathType::Metadata,
689 );
690 let object_store = env.get_object_store().unwrap();
691 let list = object_store.list(&path).await.unwrap();
692 for entry in list {
694 if entry.metadata().is_dir() {
695 continue;
696 }
697 if entry.name().ends_with("parquet") {
698 info!("deleting {}", entry.path());
699 object_store.delete(entry.path()).await.unwrap();
700 }
701 }
702
703 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
704 .into_iter()
705 .collect();
706 let open_request = RegionOpenRequest {
707 engine: METRIC_ENGINE_NAME.to_string(),
708 table_dir: TestEnv::default_table_dir(),
709 path_type: PathType::Bare,
710 options: physical_region_option,
711 skip_wal_replay: false,
712 checkpoint: None,
713 };
714 metric_engine
717 .handle_request(physical_region_id, RegionRequest::Open(open_request))
718 .await
719 .unwrap();
720
721 metric_engine
723 .handle_request(
724 physical_region_id,
725 RegionRequest::Close(RegionCloseRequest {}),
726 )
727 .await
728 .unwrap();
729
730 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
732 .into_iter()
733 .collect();
734 let open_request = RegionOpenRequest {
735 engine: METRIC_ENGINE_NAME.to_string(),
736 table_dir: TestEnv::default_table_dir(),
737 path_type: PathType::Bare,
738 options: physical_region_option,
739 skip_wal_replay: false,
740 checkpoint: None,
741 };
742 let err = metric_engine
743 .handle_request(physical_region_id, RegionRequest::Open(open_request))
744 .await
745 .unwrap_err();
746 assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
748
749 let mito_engine = metric_engine.mito();
750 let data_region_id = utils::to_data_region_id(physical_region_id);
751 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
752 let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
754 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
755 let err = mito_engine
756 .get_metadata(metadata_region_id)
757 .await
758 .unwrap_err();
759 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
760 }
761
762 #[tokio::test]
763 async fn test_catchup_regions() {
764 common_telemetry::init_default_ut_logging();
765 maybe_skip_kafka_log_store_integration_test!();
766 let kafka_log_store_factory = kafka_log_store_factory().unwrap();
767 let mito_env = mito2::test_util::TestEnv::new()
768 .await
769 .with_log_store_factory(kafka_log_store_factory.clone());
770 let env = TestEnv::with_mito_env(mito_env).await;
771 let table_dir = |region_id| format!("table/{region_id}");
772 let mut physical_region_ids = vec![];
773 let mut logical_region_ids = vec![];
774
775 let num_topics = 3;
776 let num_physical_regions = 8;
777 let num_logical_regions = 16;
778 let parallelism = 2;
779 let mut topics = Vec::with_capacity(num_topics);
780 for _ in 0..num_topics {
781 let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory)
782 .await
783 .unwrap();
784 topics.push(topic);
785 }
786
787 let topic_idx = |id| (id as usize) % num_topics;
788 for i in 0..num_physical_regions {
790 let physical_region_id = RegionId::new(1, i);
791 physical_region_ids.push(physical_region_id);
792
793 let wal_options = WalOptions::Kafka(KafkaWalOptions {
794 topic: topics[topic_idx(i)].clone(),
795 });
796 env.create_physical_region(
797 physical_region_id,
798 &table_dir(physical_region_id),
799 vec![(
800 WAL_OPTIONS_KEY.to_string(),
801 serde_json::to_string(&wal_options).unwrap(),
802 )],
803 )
804 .await;
805 for j in 0..num_logical_regions {
807 let logical_region_id = RegionId::new(1024 + i, j);
808 logical_region_ids.push(logical_region_id);
809 env.create_logical_region(physical_region_id, logical_region_id)
810 .await;
811 }
812 }
813
814 let metric_engine = env.metric();
815 for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) {
817 metric_engine
818 .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
819 .await
820 .unwrap();
821 }
822
823 let requests = physical_region_ids
825 .iter()
826 .enumerate()
827 .map(|(idx, region_id)| {
828 let mut options = HashMap::new();
829 let wal_options = WalOptions::Kafka(KafkaWalOptions {
830 topic: topics[topic_idx(idx as u32)].clone(),
831 });
832 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new());
833 options.insert(
834 WAL_OPTIONS_KEY.to_string(),
835 serde_json::to_string(&wal_options).unwrap(),
836 );
837 (
838 *region_id,
839 RegionOpenRequest {
840 engine: METRIC_ENGINE_NAME.to_string(),
841 table_dir: table_dir(*region_id),
842 path_type: PathType::Bare,
843 options: options.clone(),
844 skip_wal_replay: true,
845 checkpoint: None,
846 },
847 )
848 })
849 .collect::<Vec<_>>();
850 info!("Open batch regions with parallelism: {parallelism}");
851 metric_engine
852 .handle_batch_open_requests(parallelism, requests)
853 .await
854 .unwrap();
855 {
856 let state = metric_engine.inner.state.read().unwrap();
857 for logical_region in &logical_region_ids {
858 assert!(!state.logical_regions().contains_key(logical_region));
859 }
860 }
861
862 let catch_requests = physical_region_ids
863 .iter()
864 .map(|region_id| {
865 (
866 *region_id,
867 RegionCatchupRequest {
868 set_writable: true,
869 ..Default::default()
870 },
871 )
872 })
873 .collect::<Vec<_>>();
874 metric_engine
875 .handle_batch_catchup_requests(parallelism, catch_requests)
876 .await
877 .unwrap();
878 {
879 let state = metric_engine.inner.state.read().unwrap();
880 for logical_region in &logical_region_ids {
881 assert!(state.logical_regions().contains_key(logical_region));
882 }
883 }
884 }
885}