1mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use either::Either;
48use futures_util::future::try_join_all;
49use metric_engine::engine::MetricEngine;
50use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
51use prost::Message;
52use query::QueryEngineRef;
53pub use query::dummy_catalog::{
54 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
55};
56use serde_json;
57use servers::error::{
58 self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
59};
60use servers::grpc::FlightCompression;
61use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
62use servers::grpc::region_server::RegionServerHandler;
63use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metric_engine_consts::{
66 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
67};
68use store_api::region_engine::{
69 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
70 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
71 SyncRegionFromRequest,
72};
73use store_api::region_request::{
74 AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
75 RegionOpenRequest, RegionRequest,
76};
77use store_api::storage::RegionId;
78use tokio::sync::{Semaphore, SemaphorePermit};
79use tokio::time::timeout;
80use tonic::{Request, Response, Result as TonicResult};
81
82use crate::error::{
83 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
84 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
85 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
86 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
87 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
88 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
89};
90use crate::event_listener::RegionServerEventListenerRef;
91use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
92
93#[derive(Clone)]
94pub struct RegionServer {
95 inner: Arc<RegionServerInner>,
96 flight_compression: FlightCompression,
97 suspend: Arc<AtomicBool>,
98}
99
100pub struct RegionStat {
101 pub region_id: RegionId,
102 pub engine: String,
103 pub role: RegionRole,
104}
105
106impl RegionServer {
107 pub fn new(
108 query_engine: QueryEngineRef,
109 runtime: Runtime,
110 event_listener: RegionServerEventListenerRef,
111 flight_compression: FlightCompression,
112 ) -> Self {
113 Self::with_table_provider(
114 query_engine,
115 runtime,
116 event_listener,
117 Arc::new(DummyTableProviderFactory),
118 0,
119 Duration::from_millis(0),
120 flight_compression,
121 )
122 }
123
124 pub fn with_table_provider(
125 query_engine: QueryEngineRef,
126 runtime: Runtime,
127 event_listener: RegionServerEventListenerRef,
128 table_provider_factory: TableProviderFactoryRef,
129 max_concurrent_queries: usize,
130 concurrent_query_limiter_timeout: Duration,
131 flight_compression: FlightCompression,
132 ) -> Self {
133 Self {
134 inner: Arc::new(RegionServerInner::new(
135 query_engine,
136 runtime,
137 event_listener,
138 table_provider_factory,
139 RegionServerParallelism::from_opts(
140 max_concurrent_queries,
141 concurrent_query_limiter_timeout,
142 ),
143 )),
144 flight_compression,
145 suspend: Arc::new(AtomicBool::new(false)),
146 }
147 }
148
149 pub fn register_engine(&mut self, engine: RegionEngineRef) {
151 self.inner.register_engine(engine);
152 }
153
154 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
156 self.inner.set_topic_stats_reporter(topic_stats_reporter);
157 }
158
159 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
161 match self.inner.get_engine(region_id, &RegionChange::None) {
162 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
163 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
164 Err(error::Error::RegionNotFound { .. }) => Ok(None),
165 Err(err) => Err(err),
166 }
167 }
168
169 pub fn mito_engine(&self) -> Option<MitoEngine> {
171 if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
172 Some(mito)
173 } else {
174 self.inner
175 .engines
176 .read()
177 .unwrap()
178 .get(MITO_ENGINE_NAME)
179 .cloned()
180 .and_then(|e| {
181 let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
182 if mito.is_none() {
183 warn!("Mito engine not found in region server engines");
184 }
185 mito
186 })
187 }
188 }
189
190 #[tracing::instrument(skip_all)]
191 pub async fn handle_batch_open_requests(
192 &self,
193 parallelism: usize,
194 requests: Vec<(RegionId, RegionOpenRequest)>,
195 ignore_nonexistent_region: bool,
196 ) -> Result<Vec<RegionId>> {
197 self.inner
198 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
199 .await
200 }
201
202 #[tracing::instrument(skip_all)]
203 pub async fn handle_batch_catchup_requests(
204 &self,
205 parallelism: usize,
206 requests: Vec<(RegionId, RegionCatchupRequest)>,
207 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
208 self.inner
209 .handle_batch_catchup_requests(parallelism, requests)
210 .await
211 }
212
213 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
214 pub async fn handle_request(
215 &self,
216 region_id: RegionId,
217 request: RegionRequest,
218 ) -> Result<RegionResponse> {
219 self.inner.handle_request(region_id, request).await
220 }
221
222 async fn table_provider(
224 &self,
225 region_id: RegionId,
226 ctx: Option<QueryContextRef>,
227 ) -> Result<Arc<dyn TableProvider>> {
228 let status = self
229 .inner
230 .region_map
231 .get(®ion_id)
232 .context(RegionNotFoundSnafu { region_id })?
233 .clone();
234 ensure!(
235 matches!(status, RegionEngineWithStatus::Ready(_)),
236 RegionNotReadySnafu { region_id }
237 );
238
239 self.inner
240 .table_provider_factory
241 .create(region_id, status.into_engine(), ctx)
242 .await
243 .context(ExecuteLogicalPlanSnafu)
244 }
245
246 pub async fn handle_remote_read(
248 &self,
249 request: api::v1::region::QueryRequest,
250 query_ctx: QueryContextRef,
251 ) -> Result<SendableRecordBatchStream> {
252 let _permit = if let Some(p) = &self.inner.parallelism {
253 Some(p.acquire().await?)
254 } else {
255 None
256 };
257
258 let region_id = RegionId::from_u64(request.region_id);
259 let catalog_list = Arc::new(NameAwareCatalogList::new(
260 self.clone(),
261 region_id,
262 query_ctx.clone(),
263 ));
264
265 if query_ctx.explain_verbose() {
266 common_telemetry::info!("Handle remote read for region: {}", region_id);
267 }
268
269 let decoder = self
270 .inner
271 .query_engine
272 .engine_context(query_ctx.clone())
273 .new_plan_decoder()
274 .context(NewPlanDecoderSnafu)?;
275
276 let plan = decoder
277 .decode(Bytes::from(request.plan), catalog_list, false)
278 .await
279 .context(DecodeLogicalPlanSnafu)?;
280
281 self.inner
282 .handle_read(
283 QueryRequest {
284 header: request.header,
285 region_id,
286 plan,
287 },
288 query_ctx,
289 )
290 .await
291 }
292
293 #[tracing::instrument(skip_all)]
294 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
295 let _permit = if let Some(p) = &self.inner.parallelism {
296 Some(p.acquire().await?)
297 } else {
298 None
299 };
300
301 let ctx = request.header.as_ref().map(|h| h.into());
302 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
303
304 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
305 .context(DataFusionSnafu)?;
306 let mut injector = injector_builder
307 .build(self, request.region_id, query_ctx.clone())
308 .await?;
309
310 let plan = request
311 .plan
312 .rewrite(&mut injector)
313 .context(DataFusionSnafu)?
314 .data;
315
316 self.inner
317 .handle_read(QueryRequest { plan, ..request }, query_ctx)
318 .await
319 }
320
321 pub fn reportable_regions(&self) -> Vec<RegionStat> {
325 self.inner
326 .region_map
327 .iter()
328 .filter_map(|e| {
329 let region_id = *e.key();
330 e.role(region_id).map(|role| RegionStat {
332 region_id,
333 engine: e.value().name().to_string(),
334 role,
335 })
336 })
337 .collect()
338 }
339
340 pub fn topic_stats(&self) -> Vec<TopicStat> {
342 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
343 let Some(reporter) = reporter.as_mut() else {
344 return vec![];
345 };
346 reporter
347 .reportable_topics()
348 .into_iter()
349 .map(|stat| TopicStat {
350 topic_name: stat.topic,
351 record_size: stat.record_size,
352 record_num: stat.record_num,
353 latest_entry_id: stat.latest_entry_id,
354 })
355 .collect()
356 }
357
358 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
359 self.inner.region_map.get(®ion_id).and_then(|engine| {
360 engine.role(region_id).map(|role| match role {
361 RegionRole::Follower => false,
362 RegionRole::Leader => true,
363 RegionRole::DowngradingLeader => true,
364 })
365 })
366 }
367
368 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
369 let engine = self
370 .inner
371 .region_map
372 .get(®ion_id)
373 .with_context(|| RegionNotFoundSnafu { region_id })?;
374 engine
375 .set_region_role(region_id, role)
376 .with_context(|_| HandleRegionRequestSnafu { region_id })
377 }
378
379 pub async fn set_region_role_state_gracefully(
389 &self,
390 region_id: RegionId,
391 state: SettableRegionRoleState,
392 ) -> Result<SetRegionRoleStateResponse> {
393 match self.inner.region_map.get(®ion_id) {
394 Some(engine) => Ok(engine
395 .set_region_role_state_gracefully(region_id, state)
396 .await
397 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
398 None => Ok(SetRegionRoleStateResponse::NotFound),
399 }
400 }
401
402 pub fn runtime(&self) -> Runtime {
403 self.inner.runtime.clone()
404 }
405
406 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
407 match self.inner.region_map.get(®ion_id) {
408 Some(e) => e.region_statistic(region_id),
409 None => None,
410 }
411 }
412
413 pub async fn stop(&self) -> Result<()> {
415 self.inner.stop().await
416 }
417
418 #[cfg(test)]
419 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
421 {
422 let mut engines = self.inner.engines.write().unwrap();
423 if !engines.contains_key(engine.name()) {
424 debug!("Registering test engine: {}", engine.name());
425 engines.insert(engine.name().to_string(), engine.clone());
426 }
427 }
428
429 self.inner
430 .region_map
431 .insert(region_id, RegionEngineWithStatus::Ready(engine));
432 }
433
434 async fn handle_batch_ddl_requests(
435 &self,
436 request: region_request::Body,
437 ) -> Result<RegionResponse> {
438 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
440 .context(BuildRegionRequestsSnafu)?
441 .unwrap();
442 let tracing_context = TracingContext::from_current_span();
443
444 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
445 self.inner
446 .handle_batch_request(batch_request)
447 .trace(span)
448 .await
449 }
450
451 async fn handle_requests_in_parallel(
452 &self,
453 request: region_request::Body,
454 ) -> Result<RegionResponse> {
455 let requests =
456 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
457
458 match self.try_handle_metric_batch_puts(requests).await? {
461 Either::Left(response) => Ok(response),
462 Either::Right(requests) => {
463 let tracing_context = TracingContext::from_current_span();
465 let join_tasks =
466 requests
467 .into_iter()
468 .map(|(region_id, req): (RegionId, RegionRequest)| {
469 let self_to_move = self;
470 let span = tracing_context.attach(info_span!(
471 "RegionServer::handle_region_request",
472 region_id = region_id.to_string()
473 ));
474 async move {
475 self_to_move
476 .handle_request(region_id, req)
477 .trace(span)
478 .await
479 }
480 });
481
482 let results = try_join_all(join_tasks).await?;
483 let mut affected_rows = 0;
484 let mut extensions = HashMap::new();
485 for result in results {
486 affected_rows += result.affected_rows;
487 extensions.extend(result.extensions);
488 }
489
490 Ok(RegionResponse {
491 affected_rows,
492 extensions,
493 metadata: Vec::new(),
494 })
495 }
496 }
497 }
498
499 async fn handle_requests_in_serial(
500 &self,
501 request: region_request::Body,
502 ) -> Result<RegionResponse> {
503 let requests =
504 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
505 let tracing_context = TracingContext::from_current_span();
506
507 let mut affected_rows = 0;
508 let mut extensions = HashMap::new();
509 for (region_id, req) in requests {
510 let span = tracing_context.attach(info_span!(
511 "RegionServer::handle_region_request",
512 region_id = region_id.to_string()
513 ));
514 let result = self.handle_request(region_id, req).trace(span).await?;
515
516 affected_rows += result.affected_rows;
517 extensions.extend(result.extensions);
518 }
519
520 Ok(RegionResponse {
521 affected_rows,
522 extensions,
523 metadata: Vec::new(),
524 })
525 }
526
527 async fn try_handle_metric_batch_puts(
534 &self,
535 requests: Vec<(RegionId, RegionRequest)>,
536 ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
537 if requests.is_empty() {
538 return Ok(Either::Right(requests));
539 }
540
541 if !matches!(requests[0].1, RegionRequest::Put(_)) {
543 return Ok(Either::Right(requests));
544 }
545 let first_region_id = requests[0].0;
546 let request_type = requests[0].1.request_type();
547
548 let engine = match self
552 .inner
553 .get_engine(first_region_id, &RegionChange::None)?
554 {
555 CurrentEngine::Engine(e) => e,
556 _ => return Ok(Either::Right(requests)),
557 };
558
559 if engine.name() != METRIC_ENGINE_NAME {
560 return Ok(Either::Right(requests));
561 }
562
563 let mut all_puts = true;
565 for (_, req) in &requests {
566 if !matches!(req, RegionRequest::Put(_)) {
567 all_puts = false;
568 break;
569 }
570 }
571
572 if !all_puts {
573 return Ok(Either::Right(requests));
574 }
575
576 let put_requests = requests.into_iter().map(|(region_id, req)| {
578 if let RegionRequest::Put(put) = req {
579 (region_id, put)
580 } else {
581 unreachable!("Already checked all are Put")
582 }
583 });
584
585 let metric_engine =
587 engine
588 .as_any()
589 .downcast_ref::<MetricEngine>()
590 .context(UnexpectedSnafu {
591 violated: "Failed to downcast to MetricEngine",
592 })?;
593
594 let tracing_context = TracingContext::from_current_span();
595 let batch_size = put_requests.len();
596 let span = tracing_context.attach(info_span!(
597 "RegionServer::handle_metric_batch_puts",
598 batch_size = batch_size,
599 ));
600 let result = metric_engine
601 .put_regions_batch(put_requests)
602 .trace(span)
603 .await
604 .map_err(BoxedError::new)
605 .context(HandleRegionRequestSnafu {
606 region_id: first_region_id,
607 });
608
609 match result {
610 Ok(total_affected) => {
611 crate::metrics::REGION_CHANGED_ROW_COUNT
612 .with_label_values(&[request_type])
613 .inc_by(total_affected as u64);
614 Ok(Either::Left(RegionResponse::new(total_affected)))
615 }
616 Err(err) => {
617 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
618 .with_label_values(&[request_type])
619 .inc_by(batch_size as u64);
620 Err(err)
621 }
622 }
623 }
624
625 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
626 let region_id = RegionId::from_u64(request.region_id);
627 let manifest_info = request
628 .manifest_info
629 .context(error::MissingRequiredFieldSnafu {
630 name: "manifest_info",
631 })?;
632
633 let manifest_info = match manifest_info {
634 ManifestInfo::MitoManifestInfo(info) => {
635 RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
636 }
637 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
638 info.data_manifest_version,
639 0,
640 info.metadata_manifest_version,
641 0,
642 ),
643 };
644
645 let tracing_context = TracingContext::from_current_span();
646 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
647
648 self.sync_region(
649 region_id,
650 SyncRegionFromRequest::from_manifest(manifest_info),
651 )
652 .trace(span)
653 .await
654 .map(|_| RegionResponse::new(AffectedRows::default()))
655 }
656
657 #[tracing::instrument(skip_all)]
662 async fn handle_list_metadata_request(
663 &self,
664 request: &ListMetadataRequest,
665 ) -> Result<RegionResponse> {
666 let mut region_metadatas = Vec::new();
667 for region_id in &request.region_ids {
669 let region_id = RegionId::from_u64(*region_id);
670 let Some(engine) = self.find_engine(region_id)? else {
672 region_metadatas.push(None);
673 continue;
674 };
675
676 match engine.get_metadata(region_id).await {
677 Ok(metadata) => region_metadatas.push(Some(metadata)),
678 Err(err) => {
679 if err.status_code() == StatusCode::RegionNotFound {
680 region_metadatas.push(None);
681 } else {
682 Err(err).with_context(|_| GetRegionMetadataSnafu {
683 engine: engine.name(),
684 region_id,
685 })?;
686 }
687 }
688 }
689 }
690
691 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
693
694 let response = RegionResponse::from_metadata(json_result);
695
696 Ok(response)
697 }
698
699 pub async fn sync_region(
701 &self,
702 region_id: RegionId,
703 request: SyncRegionFromRequest,
704 ) -> Result<()> {
705 let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
706 CurrentEngine::Engine(engine) => engine,
707 _ => {
708 return UnexpectedSnafu {
709 violated: "unexpected EarlyReturn engine status for a ready region",
710 }
711 .fail();
712 }
713 };
714
715 self.inner
716 .handle_sync_region(&engine, region_id, request)
717 .await
718 }
719
720 pub async fn remap_manifests(
722 &self,
723 request: RemapManifestsRequest,
724 ) -> Result<RemapManifestsResponse> {
725 let region_id = request.region_id;
726 let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
727 CurrentEngine::Engine(engine) => engine,
728 _ => {
729 return UnexpectedSnafu {
730 violated: "unexpected EarlyReturn engine status for a ready region",
731 }
732 .fail();
733 }
734 };
735
736 engine
737 .remap_manifests(request)
738 .await
739 .with_context(|_| HandleRegionRequestSnafu { region_id })
740 }
741
742 fn is_suspended(&self) -> bool {
743 self.suspend.load(Ordering::Relaxed)
744 }
745
746 pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
747 self.suspend.clone()
748 }
749}
750
751#[async_trait]
752impl RegionServerHandler for RegionServer {
753 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
754 let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
755 .with_label_values(&[request.as_ref()]);
756 let response = match &request {
757 region_request::Body::Creates(_)
758 | region_request::Body::Drops(_)
759 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
760 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
761 self.handle_requests_in_parallel(request).await
762 }
763 region_request::Body::Sync(sync_request) => {
764 self.handle_sync_region_request(sync_request).await
765 }
766 region_request::Body::ListMetadata(list_metadata_request) => {
767 self.handle_list_metadata_request(list_metadata_request)
768 .await
769 }
770 _ => self.handle_requests_in_serial(request).await,
771 }
772 .map_err(BoxedError::new)
773 .inspect_err(|_| {
774 failed_requests_cnt.inc();
775 })
776 .context(ExecuteGrpcRequestSnafu)?;
777
778 Ok(RegionResponseV1 {
779 header: Some(ResponseHeader {
780 status: Some(Status {
781 status_code: StatusCode::Success as _,
782 ..Default::default()
783 }),
784 }),
785 affected_rows: response.affected_rows as _,
786 extensions: response.extensions,
787 metadata: response.metadata,
788 })
789 }
790}
791
792#[async_trait]
793impl FlightCraft for RegionServer {
794 async fn do_get(
795 &self,
796 request: Request<Ticket>,
797 ) -> TonicResult<Response<TonicStream<FlightData>>> {
798 ensure!(!self.is_suspended(), SuspendedSnafu);
799
800 let ticket = request.into_inner().ticket;
801 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
802 .context(servers_error::InvalidFlightTicketSnafu)?;
803 let tracing_context = request
804 .header
805 .as_ref()
806 .map(|h| TracingContext::from_w3c(&h.tracing_context))
807 .unwrap_or_default();
808 let query_ctx = request
809 .header
810 .as_ref()
811 .map(|h| Arc::new(QueryContext::from(h)))
812 .unwrap_or(QueryContext::arc());
813
814 let result = self
815 .handle_remote_read(request, query_ctx.clone())
816 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
817 .await?;
818
819 let stream = Box::pin(FlightRecordBatchStream::new(
820 result,
821 tracing_context,
822 self.flight_compression,
823 query_ctx,
824 ));
825 Ok(Response::new(stream))
826 }
827}
828
829#[derive(Clone)]
830enum RegionEngineWithStatus {
831 Registering(RegionEngineRef),
833 Deregistering(RegionEngineRef),
835 Ready(RegionEngineRef),
837}
838
839impl RegionEngineWithStatus {
840 pub fn into_engine(self) -> RegionEngineRef {
842 match self {
843 RegionEngineWithStatus::Registering(engine) => engine,
844 RegionEngineWithStatus::Deregistering(engine) => engine,
845 RegionEngineWithStatus::Ready(engine) => engine,
846 }
847 }
848}
849
850impl Deref for RegionEngineWithStatus {
851 type Target = RegionEngineRef;
852
853 fn deref(&self) -> &Self::Target {
854 match self {
855 RegionEngineWithStatus::Registering(engine) => engine,
856 RegionEngineWithStatus::Deregistering(engine) => engine,
857 RegionEngineWithStatus::Ready(engine) => engine,
858 }
859 }
860}
861
862struct RegionServerInner {
863 engines: RwLock<HashMap<String, RegionEngineRef>>,
864 region_map: DashMap<RegionId, RegionEngineWithStatus>,
865 query_engine: QueryEngineRef,
866 runtime: Runtime,
867 event_listener: RegionServerEventListenerRef,
868 table_provider_factory: TableProviderFactoryRef,
869 parallelism: Option<RegionServerParallelism>,
872 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
874 mito_engine: RwLock<Option<MitoEngine>>,
878}
879
880struct RegionServerParallelism {
881 semaphore: Semaphore,
882 timeout: Duration,
883}
884
885impl RegionServerParallelism {
886 pub fn from_opts(
887 max_concurrent_queries: usize,
888 concurrent_query_limiter_timeout: Duration,
889 ) -> Option<Self> {
890 if max_concurrent_queries == 0 {
891 return None;
892 }
893 Some(RegionServerParallelism {
894 semaphore: Semaphore::new(max_concurrent_queries),
895 timeout: concurrent_query_limiter_timeout,
896 })
897 }
898
899 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
900 timeout(self.timeout, self.semaphore.acquire())
901 .await
902 .context(ConcurrentQueryLimiterTimeoutSnafu)?
903 .context(ConcurrentQueryLimiterClosedSnafu)
904 }
905}
906
907enum CurrentEngine {
908 Engine(RegionEngineRef),
909 EarlyReturn(AffectedRows),
910}
911
912impl Debug for CurrentEngine {
913 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914 match self {
915 CurrentEngine::Engine(engine) => f
916 .debug_struct("CurrentEngine")
917 .field("engine", &engine.name())
918 .finish(),
919 CurrentEngine::EarlyReturn(rows) => f
920 .debug_struct("CurrentEngine")
921 .field("return", rows)
922 .finish(),
923 }
924 }
925}
926
927impl RegionServerInner {
928 pub fn new(
929 query_engine: QueryEngineRef,
930 runtime: Runtime,
931 event_listener: RegionServerEventListenerRef,
932 table_provider_factory: TableProviderFactoryRef,
933 parallelism: Option<RegionServerParallelism>,
934 ) -> Self {
935 Self {
936 engines: RwLock::new(HashMap::new()),
937 region_map: DashMap::new(),
938 query_engine,
939 runtime,
940 event_listener,
941 table_provider_factory,
942 parallelism,
943 topic_stats_reporter: RwLock::new(None),
944 mito_engine: RwLock::new(None),
945 }
946 }
947
948 pub fn register_engine(&self, engine: RegionEngineRef) {
949 let engine_name = engine.name();
950 if engine_name == MITO_ENGINE_NAME
951 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
952 {
953 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
954 }
955
956 info!("Region Engine {engine_name} is registered");
957 self.engines
958 .write()
959 .unwrap()
960 .insert(engine_name.to_string(), engine);
961 }
962
963 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
964 info!("Set topic stats reporter");
965 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
966 }
967
968 fn get_engine(
969 &self,
970 region_id: RegionId,
971 region_change: &RegionChange,
972 ) -> Result<CurrentEngine> {
973 let current_region_status = self.region_map.get(®ion_id);
974
975 let engine = match region_change {
976 RegionChange::Register(attribute) => match current_region_status {
977 Some(status) => match status.clone() {
978 RegionEngineWithStatus::Registering(engine) => engine,
979 RegionEngineWithStatus::Deregistering(_) => {
980 return error::RegionBusySnafu { region_id }.fail();
981 }
982 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
983 },
984 _ => self
985 .engines
986 .read()
987 .unwrap()
988 .get(attribute.engine())
989 .with_context(|| RegionEngineNotFoundSnafu {
990 name: attribute.engine(),
991 })?
992 .clone(),
993 },
994 RegionChange::Deregisters => match current_region_status {
995 Some(status) => match status.clone() {
996 RegionEngineWithStatus::Registering(_) => {
997 return error::RegionBusySnafu { region_id }.fail();
998 }
999 RegionEngineWithStatus::Deregistering(_) => {
1000 return Ok(CurrentEngine::EarlyReturn(0));
1001 }
1002 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1003 },
1004 None => return Ok(CurrentEngine::EarlyReturn(0)),
1005 },
1006 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1007 match current_region_status {
1008 Some(status) => match status.clone() {
1009 RegionEngineWithStatus::Registering(_) => {
1010 return error::RegionNotReadySnafu { region_id }.fail();
1011 }
1012 RegionEngineWithStatus::Deregistering(_) => {
1013 return error::RegionNotFoundSnafu { region_id }.fail();
1014 }
1015 RegionEngineWithStatus::Ready(engine) => engine,
1016 },
1017 None => return error::RegionNotFoundSnafu { region_id }.fail(),
1018 }
1019 }
1020 };
1021
1022 Ok(CurrentEngine::Engine(engine))
1023 }
1024
1025 async fn handle_batch_open_requests_inner(
1026 &self,
1027 engine: RegionEngineRef,
1028 parallelism: usize,
1029 requests: Vec<(RegionId, RegionOpenRequest)>,
1030 ignore_nonexistent_region: bool,
1031 ) -> Result<Vec<RegionId>> {
1032 let region_changes = requests
1033 .iter()
1034 .map(|(region_id, open)| {
1035 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1036 Ok((*region_id, RegionChange::Register(attribute)))
1037 })
1038 .collect::<Result<HashMap<_, _>>>()?;
1039
1040 for (®ion_id, region_change) in ®ion_changes {
1041 self.set_region_status_not_ready(region_id, &engine, region_change)
1042 }
1043
1044 let mut open_regions = Vec::with_capacity(requests.len());
1045 let mut errors = vec![];
1046 match engine
1047 .handle_batch_open_requests(parallelism, requests)
1048 .await
1049 .with_context(|_| HandleBatchOpenRequestSnafu)
1050 {
1051 Ok(results) => {
1052 for (region_id, result) in results {
1053 let region_change = ®ion_changes[®ion_id];
1054 match result {
1055 Ok(_) => {
1056 if let Err(e) = self
1057 .set_region_status_ready(region_id, engine.clone(), *region_change)
1058 .await
1059 {
1060 error!(e; "Failed to set region to ready: {}", region_id);
1061 errors.push(BoxedError::new(e));
1062 } else {
1063 open_regions.push(region_id)
1064 }
1065 }
1066 Err(e) => {
1067 self.unset_region_status(region_id, &engine, *region_change);
1068 if e.status_code() == StatusCode::RegionNotFound
1069 && ignore_nonexistent_region
1070 {
1071 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1072 } else {
1073 error!(e; "Failed to open region: {}", region_id);
1074 errors.push(e);
1075 }
1076 }
1077 }
1078 }
1079 }
1080 Err(e) => {
1081 for (®ion_id, region_change) in ®ion_changes {
1082 self.unset_region_status(region_id, &engine, *region_change);
1083 }
1084 error!(e; "Failed to open batch regions");
1085 errors.push(BoxedError::new(e));
1086 }
1087 }
1088
1089 if !errors.is_empty() {
1090 return error::UnexpectedSnafu {
1091 violated: format!("Failed to open batch regions: {:?}", errors[0]),
1093 }
1094 .fail();
1095 }
1096
1097 Ok(open_regions)
1098 }
1099
1100 pub async fn handle_batch_open_requests(
1101 &self,
1102 parallelism: usize,
1103 requests: Vec<(RegionId, RegionOpenRequest)>,
1104 ignore_nonexistent_region: bool,
1105 ) -> Result<Vec<RegionId>> {
1106 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1107 HashMap::with_capacity(requests.len());
1108 for (region_id, request) in requests {
1109 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1110 requests.push((region_id, request));
1111 } else {
1112 engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1113 }
1114 }
1115
1116 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1117 for (engine, requests) in engine_grouped_requests {
1118 let engine = self
1119 .engines
1120 .read()
1121 .unwrap()
1122 .get(&engine)
1123 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1124 .clone();
1125 results.push(
1126 self.handle_batch_open_requests_inner(
1127 engine,
1128 parallelism,
1129 requests,
1130 ignore_nonexistent_region,
1131 )
1132 .await,
1133 )
1134 }
1135
1136 Ok(results
1137 .into_iter()
1138 .collect::<Result<Vec<_>>>()?
1139 .into_iter()
1140 .flatten()
1141 .collect::<Vec<_>>())
1142 }
1143
1144 pub async fn handle_batch_catchup_requests_inner(
1145 &self,
1146 engine: RegionEngineRef,
1147 parallelism: usize,
1148 requests: Vec<(RegionId, RegionCatchupRequest)>,
1149 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1150 for (region_id, _) in &requests {
1151 self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1152 }
1153 let region_ids = requests
1154 .iter()
1155 .map(|(region_id, _)| *region_id)
1156 .collect::<Vec<_>>();
1157 let mut responses = Vec::with_capacity(requests.len());
1158 match engine
1159 .handle_batch_catchup_requests(parallelism, requests)
1160 .await
1161 {
1162 Ok(results) => {
1163 for (region_id, result) in results {
1164 match result {
1165 Ok(_) => {
1166 if let Err(e) = self
1167 .set_region_status_ready(
1168 region_id,
1169 engine.clone(),
1170 RegionChange::Catchup,
1171 )
1172 .await
1173 {
1174 error!(e; "Failed to set region to ready: {}", region_id);
1175 responses.push((region_id, Err(BoxedError::new(e))));
1176 } else {
1177 responses.push((region_id, Ok(())));
1178 }
1179 }
1180 Err(e) => {
1181 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1182 error!(e; "Failed to catchup region: {}", region_id);
1183 responses.push((region_id, Err(e)));
1184 }
1185 }
1186 }
1187 }
1188 Err(e) => {
1189 for region_id in region_ids {
1190 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1191 }
1192 error!(e; "Failed to catchup batch regions");
1193 return error::UnexpectedSnafu {
1194 violated: format!("Failed to catchup batch regions: {:?}", e),
1195 }
1196 .fail();
1197 }
1198 }
1199
1200 Ok(responses)
1201 }
1202
1203 pub async fn handle_batch_catchup_requests(
1204 &self,
1205 parallelism: usize,
1206 requests: Vec<(RegionId, RegionCatchupRequest)>,
1207 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1208 let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1209
1210 let mut responses = Vec::with_capacity(requests.len());
1211 for (region_id, request) in requests {
1212 if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1213 match engine {
1214 CurrentEngine::Engine(engine) => {
1215 engine_grouped_requests
1216 .entry(engine.name().to_string())
1217 .or_default()
1218 .push((region_id, request));
1219 }
1220 CurrentEngine::EarlyReturn(_) => {
1221 return error::UnexpectedSnafu {
1222 violated: format!("Unexpected engine type for region {}", region_id),
1223 }
1224 .fail();
1225 }
1226 }
1227 } else {
1228 responses.push((
1229 region_id,
1230 Err(BoxedError::new(
1231 error::RegionNotFoundSnafu { region_id }.build(),
1232 )),
1233 ));
1234 }
1235 }
1236
1237 for (engine, requests) in engine_grouped_requests {
1238 let engine = self
1239 .engines
1240 .read()
1241 .unwrap()
1242 .get(&engine)
1243 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1244 .clone();
1245 responses.extend(
1246 self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1247 .await?,
1248 );
1249 }
1250
1251 Ok(responses)
1252 }
1253
1254 pub async fn handle_batch_request(
1258 &self,
1259 batch_request: BatchRegionDdlRequest,
1260 ) -> Result<RegionResponse> {
1261 let region_changes = match &batch_request {
1262 BatchRegionDdlRequest::Create(requests) => requests
1263 .iter()
1264 .map(|(region_id, create)| {
1265 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1266 Ok((*region_id, RegionChange::Register(attribute)))
1267 })
1268 .collect::<Result<Vec<_>>>()?,
1269 BatchRegionDdlRequest::Drop(requests) => requests
1270 .iter()
1271 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1272 .collect::<Vec<_>>(),
1273 BatchRegionDdlRequest::Alter(requests) => requests
1274 .iter()
1275 .map(|(region_id, _)| (*region_id, RegionChange::None))
1276 .collect::<Vec<_>>(),
1277 };
1278
1279 let (first_region_id, first_region_change) = region_changes.first().unwrap();
1282 let engine = match self.get_engine(*first_region_id, first_region_change)? {
1283 CurrentEngine::Engine(engine) => engine,
1284 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1285 };
1286
1287 for (region_id, region_change) in region_changes.iter() {
1288 self.set_region_status_not_ready(*region_id, &engine, region_change);
1289 }
1290
1291 let ddl_type = batch_request.request_type();
1292 let result = engine
1293 .handle_batch_ddl_requests(batch_request)
1294 .await
1295 .context(HandleBatchDdlRequestSnafu { ddl_type });
1296
1297 match result {
1298 Ok(result) => {
1299 for (region_id, region_change) in ®ion_changes {
1300 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1301 .await?;
1302 }
1303
1304 Ok(RegionResponse {
1305 affected_rows: result.affected_rows,
1306 extensions: result.extensions,
1307 metadata: Vec::new(),
1308 })
1309 }
1310 Err(err) => {
1311 for (region_id, region_change) in region_changes {
1312 self.unset_region_status(region_id, &engine, region_change);
1313 }
1314
1315 Err(err)
1316 }
1317 }
1318 }
1319
1320 pub async fn handle_request(
1321 &self,
1322 region_id: RegionId,
1323 request: RegionRequest,
1324 ) -> Result<RegionResponse> {
1325 let request_type = request.request_type();
1326 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1327 .with_label_values(&[request_type])
1328 .start_timer();
1329
1330 let region_change = match &request {
1331 RegionRequest::Create(create) => {
1332 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1333 RegionChange::Register(attribute)
1334 }
1335 RegionRequest::Open(open) => {
1336 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1337 RegionChange::Register(attribute)
1338 }
1339 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1340 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1341 RegionChange::Ingest
1342 }
1343 RegionRequest::Alter(_)
1344 | RegionRequest::Flush(_)
1345 | RegionRequest::Compact(_)
1346 | RegionRequest::Truncate(_)
1347 | RegionRequest::BuildIndex(_)
1348 | RegionRequest::EnterStaging(_)
1349 | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1350 RegionRequest::Catchup(_) => RegionChange::Catchup,
1351 };
1352
1353 let engine = match self.get_engine(region_id, ®ion_change)? {
1354 CurrentEngine::Engine(engine) => engine,
1355 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1356 };
1357
1358 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1360
1361 match engine
1362 .handle_request(region_id, request)
1363 .await
1364 .with_context(|_| HandleRegionRequestSnafu { region_id })
1365 {
1366 Ok(result) => {
1367 if matches!(region_change, RegionChange::Ingest) {
1369 crate::metrics::REGION_CHANGED_ROW_COUNT
1370 .with_label_values(&[request_type])
1371 .inc_by(result.affected_rows as u64);
1372 }
1373 self.set_region_status_ready(region_id, engine.clone(), region_change)
1375 .await?;
1376
1377 Ok(RegionResponse {
1378 affected_rows: result.affected_rows,
1379 extensions: result.extensions,
1380 metadata: Vec::new(),
1381 })
1382 }
1383 Err(err) => {
1384 if matches!(region_change, RegionChange::Ingest) {
1385 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1386 .with_label_values(&[request_type])
1387 .inc();
1388 }
1389 self.unset_region_status(region_id, &engine, region_change);
1391 Err(err)
1392 }
1393 }
1394 }
1395
1396 pub async fn handle_sync_region(
1398 &self,
1399 engine: &RegionEngineRef,
1400 region_id: RegionId,
1401 request: SyncRegionFromRequest,
1402 ) -> Result<()> {
1403 let Some(new_opened_regions) = engine
1404 .sync_region(region_id, request)
1405 .await
1406 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1407 .new_opened_logical_region_ids()
1408 else {
1409 return Ok(());
1410 };
1411
1412 for region in &new_opened_regions {
1413 self.region_map
1414 .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1415 }
1416 if !new_opened_regions.is_empty() {
1417 info!(
1418 region_id = %region_id,
1419 logical_region_count = new_opened_regions.len(),
1420 logical_regions = ?new_opened_regions,
1421 "Logical regions are registered"
1422 );
1423 }
1424
1425 Ok(())
1426 }
1427
1428 fn set_region_status_not_ready(
1429 &self,
1430 region_id: RegionId,
1431 engine: &RegionEngineRef,
1432 region_change: &RegionChange,
1433 ) {
1434 match region_change {
1435 RegionChange::Register(_) => {
1436 self.region_map.insert(
1437 region_id,
1438 RegionEngineWithStatus::Registering(engine.clone()),
1439 );
1440 }
1441 RegionChange::Deregisters => {
1442 self.region_map.insert(
1443 region_id,
1444 RegionEngineWithStatus::Deregistering(engine.clone()),
1445 );
1446 }
1447 _ => {}
1448 }
1449 }
1450
1451 fn unset_region_status(
1452 &self,
1453 region_id: RegionId,
1454 engine: &RegionEngineRef,
1455 region_change: RegionChange,
1456 ) {
1457 match region_change {
1458 RegionChange::None | RegionChange::Ingest => {}
1459 RegionChange::Register(_) => {
1460 self.region_map.remove(®ion_id);
1461 }
1462 RegionChange::Deregisters => {
1463 self.region_map
1464 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1465 }
1466 RegionChange::Catchup => {}
1467 }
1468 }
1469
1470 async fn set_region_status_ready(
1471 &self,
1472 region_id: RegionId,
1473 engine: RegionEngineRef,
1474 region_change: RegionChange,
1475 ) -> Result<()> {
1476 let engine_type = engine.name();
1477 match region_change {
1478 RegionChange::None | RegionChange::Ingest => {}
1479 RegionChange::Register(attribute) => {
1480 info!(
1481 "Region {region_id} is registered to engine {}",
1482 attribute.engine()
1483 );
1484 self.region_map
1485 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1486
1487 match attribute {
1488 RegionAttribute::Metric { physical } => {
1489 if physical {
1490 self.register_logical_regions(&engine, region_id).await?;
1492 self.event_listener.on_region_registered(region_id);
1494 }
1495 }
1496 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1497 RegionAttribute::File => {
1498 }
1500 }
1501 }
1502 RegionChange::Deregisters => {
1503 info!("Region {region_id} is deregistered from engine {engine_type}");
1504 self.region_map
1505 .remove(®ion_id)
1506 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1507 self.event_listener.on_region_deregistered(region_id);
1508 }
1509 RegionChange::Catchup => {
1510 if is_metric_engine(engine.name()) {
1511 self.register_logical_regions(&engine, region_id).await?;
1513 }
1514 }
1515 }
1516 Ok(())
1517 }
1518
1519 async fn register_logical_regions(
1520 &self,
1521 engine: &RegionEngineRef,
1522 physical_region_id: RegionId,
1523 ) -> Result<()> {
1524 let metric_engine =
1525 engine
1526 .as_any()
1527 .downcast_ref::<MetricEngine>()
1528 .context(UnexpectedSnafu {
1529 violated: format!(
1530 "expecting engine type '{}', actual '{}'",
1531 METRIC_ENGINE_NAME,
1532 engine.name(),
1533 ),
1534 })?;
1535
1536 let logical_regions = metric_engine
1537 .logical_regions(physical_region_id)
1538 .await
1539 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1540
1541 for region in &logical_regions {
1542 self.region_map
1543 .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1544 }
1545 if !logical_regions.is_empty() {
1546 info!(
1547 physical_region_id = %physical_region_id,
1548 logical_region_count = logical_regions.len(),
1549 logical_regions = ?logical_regions,
1550 "Logical regions are registered"
1551 );
1552 }
1553 Ok(())
1554 }
1555
1556 pub async fn handle_read(
1557 &self,
1558 request: QueryRequest,
1559 query_ctx: QueryContextRef,
1560 ) -> Result<SendableRecordBatchStream> {
1561 let result = self
1564 .query_engine
1565 .execute(request.plan, query_ctx)
1566 .await
1567 .context(ExecuteLogicalPlanSnafu)?;
1568
1569 match result.data {
1570 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1571 UnsupportedOutputSnafu { expected: "stream" }.fail()
1572 }
1573 OutputData::Stream(stream) => Ok(stream),
1574 }
1575 }
1576
1577 async fn stop(&self) -> Result<()> {
1578 let regions = self
1586 .region_map
1587 .iter()
1588 .map(|x| (*x.key(), x.value().clone()))
1589 .collect::<Vec<_>>();
1590 let num_regions = regions.len();
1591
1592 for (region_id, engine) in regions {
1593 let closed = engine
1594 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1595 .await;
1596 match closed {
1597 Ok(_) => debug!("Region {region_id} is closed"),
1598 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1599 }
1600 }
1601 self.region_map.clear();
1602 info!("closed {num_regions} regions");
1603
1604 drop(self.mito_engine.write().unwrap().take());
1605 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1606 for (engine_name, engine) in engines {
1607 engine
1608 .stop()
1609 .await
1610 .context(StopRegionEngineSnafu { name: &engine_name })?;
1611 info!("Region engine {engine_name} is stopped");
1612 }
1613
1614 Ok(())
1615 }
1616}
1617
1618#[derive(Debug, Clone, Copy)]
1619enum RegionChange {
1620 None,
1621 Register(RegionAttribute),
1622 Deregisters,
1623 Catchup,
1624 Ingest,
1625}
1626
1627fn is_metric_engine(engine: &str) -> bool {
1628 engine == METRIC_ENGINE_NAME
1629}
1630
1631fn parse_region_attribute(
1632 engine: &str,
1633 options: &HashMap<String, String>,
1634) -> Result<RegionAttribute> {
1635 match engine {
1636 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1637 METRIC_ENGINE_NAME => {
1638 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1639
1640 Ok(RegionAttribute::Metric { physical })
1641 }
1642 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1643 _ => error::UnexpectedSnafu {
1644 violated: format!("Unknown engine: {}", engine),
1645 }
1646 .fail(),
1647 }
1648}
1649
1650#[derive(Debug, Clone, Copy)]
1651enum RegionAttribute {
1652 Mito,
1653 Metric { physical: bool },
1654 File,
1655}
1656
1657impl RegionAttribute {
1658 fn engine(&self) -> &'static str {
1659 match self {
1660 RegionAttribute::Mito => MITO_ENGINE_NAME,
1661 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1662 RegionAttribute::File => FILE_ENGINE_NAME,
1663 }
1664 }
1665}
1666
1667#[cfg(test)]
1668mod tests {
1669
1670 use std::assert_matches::assert_matches;
1671
1672 use api::v1::SemanticType;
1673 use common_error::ext::ErrorExt;
1674 use datatypes::prelude::ConcreteDataType;
1675 use mito2::test_util::CreateRequestBuilder;
1676 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1677 use store_api::region_engine::RegionEngine;
1678 use store_api::region_request::{
1679 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1680 };
1681 use store_api::storage::RegionId;
1682
1683 use super::*;
1684 use crate::error::Result;
1685 use crate::tests::{MockRegionEngine, mock_region_server};
1686
1687 #[tokio::test]
1688 async fn test_region_registering() {
1689 common_telemetry::init_default_ut_logging();
1690
1691 let mut mock_region_server = mock_region_server();
1692 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1693 let engine_name = engine.name();
1694 mock_region_server.register_engine(engine.clone());
1695 let region_id = RegionId::new(1, 1);
1696 let builder = CreateRequestBuilder::new();
1697 let create_req = builder.build();
1698 mock_region_server.inner.region_map.insert(
1700 region_id,
1701 RegionEngineWithStatus::Registering(engine.clone()),
1702 );
1703 let response = mock_region_server
1704 .handle_request(region_id, RegionRequest::Create(create_req))
1705 .await
1706 .unwrap();
1707 assert_eq!(response.affected_rows, 0);
1708 let status = mock_region_server
1709 .inner
1710 .region_map
1711 .get(®ion_id)
1712 .unwrap()
1713 .clone();
1714 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1715
1716 mock_region_server.inner.region_map.insert(
1717 region_id,
1718 RegionEngineWithStatus::Registering(engine.clone()),
1719 );
1720 let response = mock_region_server
1721 .handle_request(
1722 region_id,
1723 RegionRequest::Open(RegionOpenRequest {
1724 engine: engine_name.to_string(),
1725 table_dir: String::new(),
1726 path_type: PathType::Bare,
1727 options: Default::default(),
1728 skip_wal_replay: false,
1729 checkpoint: None,
1730 }),
1731 )
1732 .await
1733 .unwrap();
1734 assert_eq!(response.affected_rows, 0);
1735 let status = mock_region_server
1736 .inner
1737 .region_map
1738 .get(®ion_id)
1739 .unwrap()
1740 .clone();
1741 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1742 }
1743
1744 #[tokio::test]
1745 async fn test_region_deregistering() {
1746 common_telemetry::init_default_ut_logging();
1747
1748 let mut mock_region_server = mock_region_server();
1749 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1750
1751 mock_region_server.register_engine(engine.clone());
1752
1753 let region_id = RegionId::new(1, 1);
1754
1755 mock_region_server.inner.region_map.insert(
1757 region_id,
1758 RegionEngineWithStatus::Deregistering(engine.clone()),
1759 );
1760
1761 let response = mock_region_server
1762 .handle_request(
1763 region_id,
1764 RegionRequest::Drop(RegionDropRequest {
1765 fast_path: false,
1766 force: false,
1767 partial_drop: false,
1768 }),
1769 )
1770 .await
1771 .unwrap();
1772 assert_eq!(response.affected_rows, 0);
1773
1774 let status = mock_region_server
1775 .inner
1776 .region_map
1777 .get(®ion_id)
1778 .unwrap()
1779 .clone();
1780 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1781
1782 mock_region_server.inner.region_map.insert(
1783 region_id,
1784 RegionEngineWithStatus::Deregistering(engine.clone()),
1785 );
1786
1787 let response = mock_region_server
1788 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1789 .await
1790 .unwrap();
1791 assert_eq!(response.affected_rows, 0);
1792
1793 let status = mock_region_server
1794 .inner
1795 .region_map
1796 .get(®ion_id)
1797 .unwrap()
1798 .clone();
1799 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1800 }
1801
1802 #[tokio::test]
1803 async fn test_region_not_ready() {
1804 common_telemetry::init_default_ut_logging();
1805
1806 let mut mock_region_server = mock_region_server();
1807 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1808
1809 mock_region_server.register_engine(engine.clone());
1810
1811 let region_id = RegionId::new(1, 1);
1812
1813 mock_region_server.inner.region_map.insert(
1815 region_id,
1816 RegionEngineWithStatus::Registering(engine.clone()),
1817 );
1818
1819 let err = mock_region_server
1820 .handle_request(
1821 region_id,
1822 RegionRequest::Truncate(RegionTruncateRequest::All),
1823 )
1824 .await
1825 .unwrap_err();
1826
1827 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1828 }
1829
1830 #[tokio::test]
1831 async fn test_region_request_failed() {
1832 common_telemetry::init_default_ut_logging();
1833
1834 let mut mock_region_server = mock_region_server();
1835 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1836 MITO_ENGINE_NAME,
1837 Box::new(|_region_id, _request| {
1838 error::UnexpectedSnafu {
1839 violated: "test".to_string(),
1840 }
1841 .fail()
1842 }),
1843 );
1844
1845 mock_region_server.register_engine(engine.clone());
1846
1847 let region_id = RegionId::new(1, 1);
1848 let builder = CreateRequestBuilder::new();
1849 let create_req = builder.build();
1850 mock_region_server
1851 .handle_request(region_id, RegionRequest::Create(create_req))
1852 .await
1853 .unwrap_err();
1854
1855 let status = mock_region_server.inner.region_map.get(®ion_id);
1856 assert!(status.is_none());
1857
1858 mock_region_server
1859 .inner
1860 .region_map
1861 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1862
1863 mock_region_server
1864 .handle_request(
1865 region_id,
1866 RegionRequest::Drop(RegionDropRequest {
1867 fast_path: false,
1868 force: false,
1869 partial_drop: false,
1870 }),
1871 )
1872 .await
1873 .unwrap_err();
1874
1875 let status = mock_region_server.inner.region_map.get(®ion_id);
1876 assert!(status.is_some());
1877 }
1878
1879 #[tokio::test]
1880 async fn test_batch_open_region_ignore_nonexistent_regions() {
1881 common_telemetry::init_default_ut_logging();
1882 let mut mock_region_server = mock_region_server();
1883 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1884 MITO_ENGINE_NAME,
1885 Box::new(|region_id, _request| {
1886 if region_id == RegionId::new(1, 1) {
1887 error::RegionNotFoundSnafu { region_id }.fail()
1888 } else {
1889 Ok(0)
1890 }
1891 }),
1892 );
1893 mock_region_server.register_engine(engine.clone());
1894
1895 let region_ids = mock_region_server
1896 .handle_batch_open_requests(
1897 8,
1898 vec![
1899 (
1900 RegionId::new(1, 1),
1901 RegionOpenRequest {
1902 engine: MITO_ENGINE_NAME.to_string(),
1903 table_dir: String::new(),
1904 path_type: PathType::Bare,
1905 options: Default::default(),
1906 skip_wal_replay: false,
1907 checkpoint: None,
1908 },
1909 ),
1910 (
1911 RegionId::new(1, 2),
1912 RegionOpenRequest {
1913 engine: MITO_ENGINE_NAME.to_string(),
1914 table_dir: String::new(),
1915 path_type: PathType::Bare,
1916 options: Default::default(),
1917 skip_wal_replay: false,
1918 checkpoint: None,
1919 },
1920 ),
1921 ],
1922 true,
1923 )
1924 .await
1925 .unwrap();
1926 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1927
1928 let err = mock_region_server
1929 .handle_batch_open_requests(
1930 8,
1931 vec![
1932 (
1933 RegionId::new(1, 1),
1934 RegionOpenRequest {
1935 engine: MITO_ENGINE_NAME.to_string(),
1936 table_dir: String::new(),
1937 path_type: PathType::Bare,
1938 options: Default::default(),
1939 skip_wal_replay: false,
1940 checkpoint: None,
1941 },
1942 ),
1943 (
1944 RegionId::new(1, 2),
1945 RegionOpenRequest {
1946 engine: MITO_ENGINE_NAME.to_string(),
1947 table_dir: String::new(),
1948 path_type: PathType::Bare,
1949 options: Default::default(),
1950 skip_wal_replay: false,
1951 checkpoint: None,
1952 },
1953 ),
1954 ],
1955 false,
1956 )
1957 .await
1958 .unwrap_err();
1959 assert_eq!(err.status_code(), StatusCode::Unexpected);
1960 }
1961
1962 struct CurrentEngineTest {
1963 region_id: RegionId,
1964 current_region_status: Option<RegionEngineWithStatus>,
1965 region_change: RegionChange,
1966 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1967 }
1968
1969 #[tokio::test]
1970 async fn test_current_engine() {
1971 common_telemetry::init_default_ut_logging();
1972
1973 let mut mock_region_server = mock_region_server();
1974 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1975 mock_region_server.register_engine(engine.clone());
1976
1977 let region_id = RegionId::new(1024, 1);
1978 let tests = vec![
1979 CurrentEngineTest {
1981 region_id,
1982 current_region_status: None,
1983 region_change: RegionChange::None,
1984 assert: Box::new(|result| {
1985 let err = result.unwrap_err();
1986 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1987 }),
1988 },
1989 CurrentEngineTest {
1990 region_id,
1991 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1992 region_change: RegionChange::None,
1993 assert: Box::new(|result| {
1994 let current_engine = result.unwrap();
1995 assert_matches!(current_engine, CurrentEngine::Engine(_));
1996 }),
1997 },
1998 CurrentEngineTest {
1999 region_id,
2000 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2001 region_change: RegionChange::None,
2002 assert: Box::new(|result| {
2003 let err = result.unwrap_err();
2004 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2005 }),
2006 },
2007 CurrentEngineTest {
2008 region_id,
2009 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2010 region_change: RegionChange::None,
2011 assert: Box::new(|result| {
2012 let err = result.unwrap_err();
2013 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2014 }),
2015 },
2016 CurrentEngineTest {
2018 region_id,
2019 current_region_status: None,
2020 region_change: RegionChange::Register(RegionAttribute::Mito),
2021 assert: Box::new(|result| {
2022 let current_engine = result.unwrap();
2023 assert_matches!(current_engine, CurrentEngine::Engine(_));
2024 }),
2025 },
2026 CurrentEngineTest {
2027 region_id,
2028 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2029 region_change: RegionChange::Register(RegionAttribute::Mito),
2030 assert: Box::new(|result| {
2031 let current_engine = result.unwrap();
2032 assert_matches!(current_engine, CurrentEngine::Engine(_));
2033 }),
2034 },
2035 CurrentEngineTest {
2036 region_id,
2037 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2038 region_change: RegionChange::Register(RegionAttribute::Mito),
2039 assert: Box::new(|result| {
2040 let err = result.unwrap_err();
2041 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2042 }),
2043 },
2044 CurrentEngineTest {
2045 region_id,
2046 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2047 region_change: RegionChange::Register(RegionAttribute::Mito),
2048 assert: Box::new(|result| {
2049 let current_engine = result.unwrap();
2050 assert_matches!(current_engine, CurrentEngine::Engine(_));
2051 }),
2052 },
2053 CurrentEngineTest {
2055 region_id,
2056 current_region_status: None,
2057 region_change: RegionChange::Deregisters,
2058 assert: Box::new(|result| {
2059 let current_engine = result.unwrap();
2060 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2061 }),
2062 },
2063 CurrentEngineTest {
2064 region_id,
2065 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2066 region_change: RegionChange::Deregisters,
2067 assert: Box::new(|result| {
2068 let err = result.unwrap_err();
2069 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2070 }),
2071 },
2072 CurrentEngineTest {
2073 region_id,
2074 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2075 region_change: RegionChange::Deregisters,
2076 assert: Box::new(|result| {
2077 let current_engine = result.unwrap();
2078 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2079 }),
2080 },
2081 CurrentEngineTest {
2082 region_id,
2083 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2084 region_change: RegionChange::Deregisters,
2085 assert: Box::new(|result| {
2086 let current_engine = result.unwrap();
2087 assert_matches!(current_engine, CurrentEngine::Engine(_));
2088 }),
2089 },
2090 ];
2091
2092 for test in tests {
2093 let CurrentEngineTest {
2094 region_id,
2095 current_region_status,
2096 region_change,
2097 assert,
2098 } = test;
2099
2100 if let Some(status) = current_region_status {
2102 mock_region_server
2103 .inner
2104 .region_map
2105 .insert(region_id, status);
2106 } else {
2107 mock_region_server.inner.region_map.remove(®ion_id);
2108 }
2109
2110 let result = mock_region_server
2111 .inner
2112 .get_engine(region_id, ®ion_change);
2113
2114 assert(result);
2115 }
2116 }
2117
2118 #[tokio::test]
2119 async fn test_region_server_parallelism() {
2120 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2121 let first_query = p.acquire().await;
2122 assert!(first_query.is_ok());
2123 let second_query = p.acquire().await;
2124 assert!(second_query.is_ok());
2125 let third_query = p.acquire().await;
2126 assert!(third_query.is_err());
2127 let err = third_query.unwrap_err();
2128 assert_eq!(
2129 err.output_msg(),
2130 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2131 );
2132 drop(first_query);
2133 let forth_query = p.acquire().await;
2134 assert!(forth_query.is_ok());
2135 }
2136
2137 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2138 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2139 metadata_builder.push_column_metadata(ColumnMetadata {
2140 column_schema: datatypes::schema::ColumnSchema::new(
2141 "timestamp",
2142 ConcreteDataType::timestamp_nanosecond_datatype(),
2143 false,
2144 ),
2145 semantic_type: SemanticType::Timestamp,
2146 column_id: 0,
2147 });
2148 metadata_builder.push_column_metadata(ColumnMetadata {
2149 column_schema: datatypes::schema::ColumnSchema::new(
2150 "file",
2151 ConcreteDataType::string_datatype(),
2152 true,
2153 ),
2154 semantic_type: SemanticType::Tag,
2155 column_id: 1,
2156 });
2157 metadata_builder.push_column_metadata(ColumnMetadata {
2158 column_schema: datatypes::schema::ColumnSchema::new(
2159 "message",
2160 ConcreteDataType::string_datatype(),
2161 true,
2162 ),
2163 semantic_type: SemanticType::Field,
2164 column_id: 2,
2165 });
2166 metadata_builder.primary_key(vec![1]);
2167 metadata_builder.build().unwrap()
2168 }
2169
2170 #[tokio::test]
2171 async fn test_handle_list_metadata_request() {
2172 common_telemetry::init_default_ut_logging();
2173
2174 let mut mock_region_server = mock_region_server();
2175 let region_id_1 = RegionId::new(1, 0);
2176 let region_id_2 = RegionId::new(2, 0);
2177
2178 let metadata_1 = mock_region_metadata(region_id_1);
2179 let metadata_2 = mock_region_metadata(region_id_2);
2180 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2181
2182 let metadata_1 = Arc::new(metadata_1);
2183 let metadata_2 = Arc::new(metadata_2);
2184 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2185 MITO_ENGINE_NAME,
2186 Box::new(move |region_id| {
2187 if region_id == region_id_1 {
2188 Ok(metadata_1.clone())
2189 } else if region_id == region_id_2 {
2190 Ok(metadata_2.clone())
2191 } else {
2192 error::RegionNotFoundSnafu { region_id }.fail()
2193 }
2194 }),
2195 );
2196
2197 mock_region_server.register_engine(engine.clone());
2198 mock_region_server
2199 .inner
2200 .region_map
2201 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2202 mock_region_server
2203 .inner
2204 .region_map
2205 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2206
2207 let list_metadata_request = ListMetadataRequest {
2209 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2210 };
2211 let response = mock_region_server
2212 .handle_list_metadata_request(&list_metadata_request)
2213 .await
2214 .unwrap();
2215 let decoded_metadata: Vec<Option<RegionMetadata>> =
2216 serde_json::from_slice(&response.metadata).unwrap();
2217 assert_eq!(metadatas, decoded_metadata);
2218 }
2219
2220 #[tokio::test]
2221 async fn test_handle_list_metadata_not_found() {
2222 common_telemetry::init_default_ut_logging();
2223
2224 let mut mock_region_server = mock_region_server();
2225 let region_id_1 = RegionId::new(1, 0);
2226 let region_id_2 = RegionId::new(2, 0);
2227
2228 let metadata_1 = mock_region_metadata(region_id_1);
2229 let metadatas = vec![Some(metadata_1.clone()), None];
2230
2231 let metadata_1 = Arc::new(metadata_1);
2232 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2233 MITO_ENGINE_NAME,
2234 Box::new(move |region_id| {
2235 if region_id == region_id_1 {
2236 Ok(metadata_1.clone())
2237 } else {
2238 error::RegionNotFoundSnafu { region_id }.fail()
2239 }
2240 }),
2241 );
2242
2243 mock_region_server.register_engine(engine.clone());
2244 mock_region_server
2245 .inner
2246 .region_map
2247 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2248
2249 let list_metadata_request = ListMetadataRequest {
2251 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2252 };
2253 let response = mock_region_server
2254 .handle_list_metadata_request(&list_metadata_request)
2255 .await
2256 .unwrap();
2257 let decoded_metadata: Vec<Option<RegionMetadata>> =
2258 serde_json::from_slice(&response.metadata).unwrap();
2259 assert_eq!(metadatas, decoded_metadata);
2260
2261 mock_region_server
2263 .inner
2264 .region_map
2265 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2266 let response = mock_region_server
2267 .handle_list_metadata_request(&list_metadata_request)
2268 .await
2269 .unwrap();
2270 let decoded_metadata: Vec<Option<RegionMetadata>> =
2271 serde_json::from_slice(&response.metadata).unwrap();
2272 assert_eq!(metadatas, decoded_metadata);
2273 }
2274
2275 #[tokio::test]
2276 async fn test_handle_list_metadata_failed() {
2277 common_telemetry::init_default_ut_logging();
2278
2279 let mut mock_region_server = mock_region_server();
2280 let region_id_1 = RegionId::new(1, 0);
2281
2282 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2283 MITO_ENGINE_NAME,
2284 Box::new(move |region_id| {
2285 error::UnexpectedSnafu {
2286 violated: format!("Failed to get region {region_id}"),
2287 }
2288 .fail()
2289 }),
2290 );
2291
2292 mock_region_server.register_engine(engine.clone());
2293 mock_region_server
2294 .inner
2295 .region_map
2296 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2297
2298 let list_metadata_request = ListMetadataRequest {
2300 region_ids: vec![region_id_1.as_u64()],
2301 };
2302 mock_region_server
2303 .handle_list_metadata_request(&list_metadata_request)
2304 .await
2305 .unwrap_err();
2306 }
2307}