1mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use either::Either;
48use futures_util::future::try_join_all;
49use metric_engine::engine::MetricEngine;
50use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
51use prost::Message;
52use query::QueryEngineRef;
53pub use query::dummy_catalog::{
54 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
55};
56use serde_json;
57use servers::error::{
58 self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
59};
60use servers::grpc::FlightCompression;
61use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
62use servers::grpc::region_server::RegionServerHandler;
63use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metric_engine_consts::{
66 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
67};
68use store_api::region_engine::{
69 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
70 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
71 SyncRegionFromRequest,
72};
73use store_api::region_request::{
74 AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
75 RegionOpenRequest, RegionRequest,
76};
77use store_api::storage::RegionId;
78use tokio::sync::{Semaphore, SemaphorePermit};
79use tokio::time::timeout;
80use tonic::{Request, Response, Result as TonicResult};
81
82use crate::error::{
83 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
84 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
85 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
86 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
87 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
88 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
89};
90use crate::event_listener::RegionServerEventListenerRef;
91use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
92
93#[derive(Clone)]
94pub struct RegionServer {
95 inner: Arc<RegionServerInner>,
96 flight_compression: FlightCompression,
97 suspend: Arc<AtomicBool>,
98}
99
100pub struct RegionStat {
101 pub region_id: RegionId,
102 pub engine: String,
103 pub role: RegionRole,
104}
105
106impl RegionServer {
107 pub fn new(
108 query_engine: QueryEngineRef,
109 runtime: Runtime,
110 event_listener: RegionServerEventListenerRef,
111 flight_compression: FlightCompression,
112 ) -> Self {
113 Self::with_table_provider(
114 query_engine,
115 runtime,
116 event_listener,
117 Arc::new(DummyTableProviderFactory),
118 0,
119 Duration::from_millis(0),
120 flight_compression,
121 )
122 }
123
124 pub fn with_table_provider(
125 query_engine: QueryEngineRef,
126 runtime: Runtime,
127 event_listener: RegionServerEventListenerRef,
128 table_provider_factory: TableProviderFactoryRef,
129 max_concurrent_queries: usize,
130 concurrent_query_limiter_timeout: Duration,
131 flight_compression: FlightCompression,
132 ) -> Self {
133 Self {
134 inner: Arc::new(RegionServerInner::new(
135 query_engine,
136 runtime,
137 event_listener,
138 table_provider_factory,
139 RegionServerParallelism::from_opts(
140 max_concurrent_queries,
141 concurrent_query_limiter_timeout,
142 ),
143 )),
144 flight_compression,
145 suspend: Arc::new(AtomicBool::new(false)),
146 }
147 }
148
149 pub fn register_engine(&mut self, engine: RegionEngineRef) {
151 self.inner.register_engine(engine);
152 }
153
154 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
156 self.inner.set_topic_stats_reporter(topic_stats_reporter);
157 }
158
159 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
161 match self.inner.get_engine(region_id, &RegionChange::None) {
162 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
163 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
164 Err(error::Error::RegionNotFound { .. }) => Ok(None),
165 Err(err) => Err(err),
166 }
167 }
168
169 pub fn mito_engine(&self) -> Option<MitoEngine> {
171 if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
172 Some(mito)
173 } else {
174 self.inner
175 .engines
176 .read()
177 .unwrap()
178 .get(MITO_ENGINE_NAME)
179 .cloned()
180 .and_then(|e| {
181 let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
182 if mito.is_none() {
183 warn!("Mito engine not found in region server engines");
184 }
185 mito
186 })
187 }
188 }
189
190 #[tracing::instrument(skip_all)]
191 pub async fn handle_batch_open_requests(
192 &self,
193 parallelism: usize,
194 requests: Vec<(RegionId, RegionOpenRequest)>,
195 ignore_nonexistent_region: bool,
196 ) -> Result<Vec<RegionId>> {
197 self.inner
198 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
199 .await
200 }
201
202 #[tracing::instrument(skip_all)]
203 pub async fn handle_batch_catchup_requests(
204 &self,
205 parallelism: usize,
206 requests: Vec<(RegionId, RegionCatchupRequest)>,
207 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
208 self.inner
209 .handle_batch_catchup_requests(parallelism, requests)
210 .await
211 }
212
213 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
214 pub async fn handle_request(
215 &self,
216 region_id: RegionId,
217 request: RegionRequest,
218 ) -> Result<RegionResponse> {
219 self.inner.handle_request(region_id, request).await
220 }
221
222 async fn table_provider(
224 &self,
225 region_id: RegionId,
226 ctx: Option<QueryContextRef>,
227 ) -> Result<Arc<dyn TableProvider>> {
228 let status = self
229 .inner
230 .region_map
231 .get(®ion_id)
232 .context(RegionNotFoundSnafu { region_id })?
233 .clone();
234 ensure!(
235 matches!(status, RegionEngineWithStatus::Ready(_)),
236 RegionNotReadySnafu { region_id }
237 );
238
239 self.inner
240 .table_provider_factory
241 .create(region_id, status.into_engine(), ctx)
242 .await
243 .context(ExecuteLogicalPlanSnafu)
244 }
245
246 pub async fn handle_remote_read(
248 &self,
249 request: api::v1::region::QueryRequest,
250 query_ctx: QueryContextRef,
251 ) -> Result<SendableRecordBatchStream> {
252 let _permit = if let Some(p) = &self.inner.parallelism {
253 Some(p.acquire().await?)
254 } else {
255 None
256 };
257
258 let region_id = RegionId::from_u64(request.region_id);
259 let catalog_list = Arc::new(NameAwareCatalogList::new(
260 self.clone(),
261 region_id,
262 query_ctx.clone(),
263 ));
264
265 if query_ctx.explain_verbose() {
266 common_telemetry::info!("Handle remote read for region: {}", region_id);
267 }
268
269 let decoder = self
270 .inner
271 .query_engine
272 .engine_context(query_ctx.clone())
273 .new_plan_decoder()
274 .context(NewPlanDecoderSnafu)?;
275
276 let plan = decoder
277 .decode(Bytes::from(request.plan), catalog_list, false)
278 .await
279 .context(DecodeLogicalPlanSnafu)?;
280
281 self.inner
282 .handle_read(
283 QueryRequest {
284 header: request.header,
285 region_id,
286 plan,
287 },
288 query_ctx,
289 )
290 .await
291 }
292
293 #[tracing::instrument(skip_all)]
294 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
295 let _permit = if let Some(p) = &self.inner.parallelism {
296 Some(p.acquire().await?)
297 } else {
298 None
299 };
300
301 let ctx = request.header.as_ref().map(|h| h.into());
302 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
303
304 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
305 .context(DataFusionSnafu)?;
306 let mut injector = injector_builder
307 .build(self, request.region_id, query_ctx.clone())
308 .await?;
309
310 let plan = request
311 .plan
312 .rewrite(&mut injector)
313 .context(DataFusionSnafu)?
314 .data;
315
316 self.inner
317 .handle_read(QueryRequest { plan, ..request }, query_ctx)
318 .await
319 }
320
321 pub fn reportable_regions(&self) -> Vec<RegionStat> {
325 self.inner
326 .region_map
327 .iter()
328 .filter_map(|e| {
329 let region_id = *e.key();
330 e.role(region_id).map(|role| RegionStat {
332 region_id,
333 engine: e.value().name().to_string(),
334 role,
335 })
336 })
337 .collect()
338 }
339
340 pub fn topic_stats(&self) -> Vec<TopicStat> {
342 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
343 let Some(reporter) = reporter.as_mut() else {
344 return vec![];
345 };
346 reporter
347 .reportable_topics()
348 .into_iter()
349 .map(|stat| TopicStat {
350 topic_name: stat.topic,
351 record_size: stat.record_size,
352 record_num: stat.record_num,
353 latest_entry_id: stat.latest_entry_id,
354 })
355 .collect()
356 }
357
358 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
359 self.inner.region_map.get(®ion_id).and_then(|engine| {
360 engine.role(region_id).map(|role| match role {
361 RegionRole::Follower => false,
362 RegionRole::Leader => true,
363 RegionRole::StagingLeader => true,
364 RegionRole::DowngradingLeader => true,
365 })
366 })
367 }
368
369 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
370 let engine = self
371 .inner
372 .region_map
373 .get(®ion_id)
374 .with_context(|| RegionNotFoundSnafu { region_id })?;
375 engine
376 .set_region_role(region_id, role)
377 .with_context(|_| HandleRegionRequestSnafu { region_id })
378 }
379
380 pub async fn set_region_role_state_gracefully(
390 &self,
391 region_id: RegionId,
392 state: SettableRegionRoleState,
393 ) -> Result<SetRegionRoleStateResponse> {
394 match self.inner.region_map.get(®ion_id) {
395 Some(engine) => Ok(engine
396 .set_region_role_state_gracefully(region_id, state)
397 .await
398 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
399 None => Ok(SetRegionRoleStateResponse::NotFound),
400 }
401 }
402
403 pub fn runtime(&self) -> Runtime {
404 self.inner.runtime.clone()
405 }
406
407 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
408 match self.inner.region_map.get(®ion_id) {
409 Some(e) => e.region_statistic(region_id),
410 None => None,
411 }
412 }
413
414 pub async fn stop(&self) -> Result<()> {
416 self.inner.stop().await
417 }
418
419 #[cfg(test)]
420 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
422 {
423 let mut engines = self.inner.engines.write().unwrap();
424 if !engines.contains_key(engine.name()) {
425 debug!("Registering test engine: {}", engine.name());
426 engines.insert(engine.name().to_string(), engine.clone());
427 }
428 }
429
430 self.inner
431 .region_map
432 .insert(region_id, RegionEngineWithStatus::Ready(engine));
433 }
434
435 async fn handle_batch_ddl_requests(
436 &self,
437 request: region_request::Body,
438 ) -> Result<RegionResponse> {
439 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
441 .context(BuildRegionRequestsSnafu)?
442 .unwrap();
443 let tracing_context = TracingContext::from_current_span();
444
445 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
446 self.inner
447 .handle_batch_request(batch_request)
448 .trace(span)
449 .await
450 }
451
452 async fn handle_requests_in_parallel(
453 &self,
454 request: region_request::Body,
455 ) -> Result<RegionResponse> {
456 let requests =
457 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
458
459 match self.try_handle_metric_batch_puts(requests).await? {
462 Either::Left(response) => Ok(response),
463 Either::Right(requests) => {
464 let tracing_context = TracingContext::from_current_span();
466 let join_tasks =
467 requests
468 .into_iter()
469 .map(|(region_id, req): (RegionId, RegionRequest)| {
470 let self_to_move = self;
471 let span = tracing_context.attach(info_span!(
472 "RegionServer::handle_region_request",
473 region_id = region_id.to_string()
474 ));
475 async move {
476 self_to_move
477 .handle_request(region_id, req)
478 .trace(span)
479 .await
480 }
481 });
482
483 let results = try_join_all(join_tasks).await?;
484 let mut affected_rows = 0;
485 let mut extensions = HashMap::new();
486 for result in results {
487 affected_rows += result.affected_rows;
488 extensions.extend(result.extensions);
489 }
490
491 Ok(RegionResponse {
492 affected_rows,
493 extensions,
494 metadata: Vec::new(),
495 })
496 }
497 }
498 }
499
500 async fn handle_requests_in_serial(
501 &self,
502 request: region_request::Body,
503 ) -> Result<RegionResponse> {
504 let requests =
505 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
506 let tracing_context = TracingContext::from_current_span();
507
508 let mut affected_rows = 0;
509 let mut extensions = HashMap::new();
510 for (region_id, req) in requests {
511 let span = tracing_context.attach(info_span!(
512 "RegionServer::handle_region_request",
513 region_id = region_id.to_string()
514 ));
515 let result = self.handle_request(region_id, req).trace(span).await?;
516
517 affected_rows += result.affected_rows;
518 extensions.extend(result.extensions);
519 }
520
521 Ok(RegionResponse {
522 affected_rows,
523 extensions,
524 metadata: Vec::new(),
525 })
526 }
527
528 async fn try_handle_metric_batch_puts(
535 &self,
536 requests: Vec<(RegionId, RegionRequest)>,
537 ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
538 if requests.is_empty() {
539 return Ok(Either::Right(requests));
540 }
541
542 if !matches!(requests[0].1, RegionRequest::Put(_)) {
544 return Ok(Either::Right(requests));
545 }
546 let first_region_id = requests[0].0;
547 let request_type = requests[0].1.request_type();
548
549 let engine = match self
553 .inner
554 .get_engine(first_region_id, &RegionChange::None)?
555 {
556 CurrentEngine::Engine(e) => e,
557 _ => return Ok(Either::Right(requests)),
558 };
559
560 if engine.name() != METRIC_ENGINE_NAME {
561 return Ok(Either::Right(requests));
562 }
563
564 let mut all_puts = true;
566 for (_, req) in &requests {
567 if !matches!(req, RegionRequest::Put(_)) {
568 all_puts = false;
569 break;
570 }
571 }
572
573 if !all_puts {
574 return Ok(Either::Right(requests));
575 }
576
577 let put_requests = requests.into_iter().map(|(region_id, req)| {
579 if let RegionRequest::Put(put) = req {
580 (region_id, put)
581 } else {
582 unreachable!("Already checked all are Put")
583 }
584 });
585
586 let metric_engine =
588 engine
589 .as_any()
590 .downcast_ref::<MetricEngine>()
591 .context(UnexpectedSnafu {
592 violated: "Failed to downcast to MetricEngine",
593 })?;
594
595 let tracing_context = TracingContext::from_current_span();
596 let batch_size = put_requests.len();
597 let span = tracing_context.attach(info_span!(
598 "RegionServer::handle_metric_batch_puts",
599 batch_size = batch_size,
600 ));
601 let result = metric_engine
602 .put_regions_batch(put_requests)
603 .trace(span)
604 .await
605 .map_err(BoxedError::new)
606 .context(HandleRegionRequestSnafu {
607 region_id: first_region_id,
608 });
609
610 match result {
611 Ok(total_affected) => {
612 crate::metrics::REGION_CHANGED_ROW_COUNT
613 .with_label_values(&[request_type])
614 .inc_by(total_affected as u64);
615 Ok(Either::Left(RegionResponse::new(total_affected)))
616 }
617 Err(err) => {
618 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
619 .with_label_values(&[request_type])
620 .inc_by(batch_size as u64);
621 Err(err)
622 }
623 }
624 }
625
626 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
627 let region_id = RegionId::from_u64(request.region_id);
628 let manifest_info = request
629 .manifest_info
630 .context(error::MissingRequiredFieldSnafu {
631 name: "manifest_info",
632 })?;
633
634 let manifest_info = match manifest_info {
635 ManifestInfo::MitoManifestInfo(info) => {
636 RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
637 }
638 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
639 info.data_manifest_version,
640 0,
641 info.metadata_manifest_version,
642 0,
643 ),
644 };
645
646 let tracing_context = TracingContext::from_current_span();
647 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
648
649 self.sync_region(
650 region_id,
651 SyncRegionFromRequest::from_manifest(manifest_info),
652 )
653 .trace(span)
654 .await
655 .map(|_| RegionResponse::new(AffectedRows::default()))
656 }
657
658 #[tracing::instrument(skip_all)]
663 async fn handle_list_metadata_request(
664 &self,
665 request: &ListMetadataRequest,
666 ) -> Result<RegionResponse> {
667 let mut region_metadatas = Vec::new();
668 for region_id in &request.region_ids {
670 let region_id = RegionId::from_u64(*region_id);
671 let Some(engine) = self.find_engine(region_id)? else {
673 region_metadatas.push(None);
674 continue;
675 };
676
677 match engine.get_metadata(region_id).await {
678 Ok(metadata) => region_metadatas.push(Some(metadata)),
679 Err(err) => {
680 if err.status_code() == StatusCode::RegionNotFound {
681 region_metadatas.push(None);
682 } else {
683 Err(err).with_context(|_| GetRegionMetadataSnafu {
684 engine: engine.name(),
685 region_id,
686 })?;
687 }
688 }
689 }
690 }
691
692 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
694
695 let response = RegionResponse::from_metadata(json_result);
696
697 Ok(response)
698 }
699
700 pub async fn sync_region(
702 &self,
703 region_id: RegionId,
704 request: SyncRegionFromRequest,
705 ) -> Result<()> {
706 let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
707 CurrentEngine::Engine(engine) => engine,
708 _ => {
709 return UnexpectedSnafu {
710 violated: "unexpected EarlyReturn engine status for a ready region",
711 }
712 .fail();
713 }
714 };
715
716 self.inner
717 .handle_sync_region(&engine, region_id, request)
718 .await
719 }
720
721 pub async fn remap_manifests(
723 &self,
724 request: RemapManifestsRequest,
725 ) -> Result<RemapManifestsResponse> {
726 let region_id = request.region_id;
727 let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
728 CurrentEngine::Engine(engine) => engine,
729 _ => {
730 return UnexpectedSnafu {
731 violated: "unexpected EarlyReturn engine status for a ready region",
732 }
733 .fail();
734 }
735 };
736
737 engine
738 .remap_manifests(request)
739 .await
740 .with_context(|_| HandleRegionRequestSnafu { region_id })
741 }
742
743 fn is_suspended(&self) -> bool {
744 self.suspend.load(Ordering::Relaxed)
745 }
746
747 pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
748 self.suspend.clone()
749 }
750}
751
752#[async_trait]
753impl RegionServerHandler for RegionServer {
754 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
755 let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
756 .with_label_values(&[request.as_ref()]);
757 let response = match &request {
758 region_request::Body::Creates(_)
759 | region_request::Body::Drops(_)
760 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
761 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
762 self.handle_requests_in_parallel(request).await
763 }
764 region_request::Body::Sync(sync_request) => {
765 self.handle_sync_region_request(sync_request).await
766 }
767 region_request::Body::ListMetadata(list_metadata_request) => {
768 self.handle_list_metadata_request(list_metadata_request)
769 .await
770 }
771 _ => self.handle_requests_in_serial(request).await,
772 }
773 .map_err(BoxedError::new)
774 .inspect_err(|_| {
775 failed_requests_cnt.inc();
776 })
777 .context(ExecuteGrpcRequestSnafu)?;
778
779 Ok(RegionResponseV1 {
780 header: Some(ResponseHeader {
781 status: Some(Status {
782 status_code: StatusCode::Success as _,
783 ..Default::default()
784 }),
785 }),
786 affected_rows: response.affected_rows as _,
787 extensions: response.extensions,
788 metadata: response.metadata,
789 })
790 }
791}
792
793#[async_trait]
794impl FlightCraft for RegionServer {
795 async fn do_get(
796 &self,
797 request: Request<Ticket>,
798 ) -> TonicResult<Response<TonicStream<FlightData>>> {
799 ensure!(!self.is_suspended(), SuspendedSnafu);
800
801 let ticket = request.into_inner().ticket;
802 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
803 .context(servers_error::InvalidFlightTicketSnafu)?;
804 let tracing_context = request
805 .header
806 .as_ref()
807 .map(|h| TracingContext::from_w3c(&h.tracing_context))
808 .unwrap_or_default();
809 let query_ctx = request
810 .header
811 .as_ref()
812 .map(|h| Arc::new(QueryContext::from(h)))
813 .unwrap_or(QueryContext::arc());
814
815 let result = self
816 .handle_remote_read(request, query_ctx.clone())
817 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
818 .await?;
819
820 let stream = Box::pin(FlightRecordBatchStream::new(
821 result,
822 tracing_context,
823 self.flight_compression,
824 query_ctx,
825 ));
826 Ok(Response::new(stream))
827 }
828}
829
830#[derive(Clone)]
831enum RegionEngineWithStatus {
832 Registering(RegionEngineRef),
834 Deregistering(RegionEngineRef),
836 Ready(RegionEngineRef),
838}
839
840impl RegionEngineWithStatus {
841 pub fn into_engine(self) -> RegionEngineRef {
843 match self {
844 RegionEngineWithStatus::Registering(engine) => engine,
845 RegionEngineWithStatus::Deregistering(engine) => engine,
846 RegionEngineWithStatus::Ready(engine) => engine,
847 }
848 }
849}
850
851impl Deref for RegionEngineWithStatus {
852 type Target = RegionEngineRef;
853
854 fn deref(&self) -> &Self::Target {
855 match self {
856 RegionEngineWithStatus::Registering(engine) => engine,
857 RegionEngineWithStatus::Deregistering(engine) => engine,
858 RegionEngineWithStatus::Ready(engine) => engine,
859 }
860 }
861}
862
863struct RegionServerInner {
864 engines: RwLock<HashMap<String, RegionEngineRef>>,
865 region_map: DashMap<RegionId, RegionEngineWithStatus>,
866 query_engine: QueryEngineRef,
867 runtime: Runtime,
868 event_listener: RegionServerEventListenerRef,
869 table_provider_factory: TableProviderFactoryRef,
870 parallelism: Option<RegionServerParallelism>,
873 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
875 mito_engine: RwLock<Option<MitoEngine>>,
879}
880
881struct RegionServerParallelism {
882 semaphore: Semaphore,
883 timeout: Duration,
884}
885
886impl RegionServerParallelism {
887 pub fn from_opts(
888 max_concurrent_queries: usize,
889 concurrent_query_limiter_timeout: Duration,
890 ) -> Option<Self> {
891 if max_concurrent_queries == 0 {
892 return None;
893 }
894 Some(RegionServerParallelism {
895 semaphore: Semaphore::new(max_concurrent_queries),
896 timeout: concurrent_query_limiter_timeout,
897 })
898 }
899
900 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
901 timeout(self.timeout, self.semaphore.acquire())
902 .await
903 .context(ConcurrentQueryLimiterTimeoutSnafu)?
904 .context(ConcurrentQueryLimiterClosedSnafu)
905 }
906}
907
908enum CurrentEngine {
909 Engine(RegionEngineRef),
910 EarlyReturn(AffectedRows),
911}
912
913impl Debug for CurrentEngine {
914 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
915 match self {
916 CurrentEngine::Engine(engine) => f
917 .debug_struct("CurrentEngine")
918 .field("engine", &engine.name())
919 .finish(),
920 CurrentEngine::EarlyReturn(rows) => f
921 .debug_struct("CurrentEngine")
922 .field("return", rows)
923 .finish(),
924 }
925 }
926}
927
928impl RegionServerInner {
929 pub fn new(
930 query_engine: QueryEngineRef,
931 runtime: Runtime,
932 event_listener: RegionServerEventListenerRef,
933 table_provider_factory: TableProviderFactoryRef,
934 parallelism: Option<RegionServerParallelism>,
935 ) -> Self {
936 Self {
937 engines: RwLock::new(HashMap::new()),
938 region_map: DashMap::new(),
939 query_engine,
940 runtime,
941 event_listener,
942 table_provider_factory,
943 parallelism,
944 topic_stats_reporter: RwLock::new(None),
945 mito_engine: RwLock::new(None),
946 }
947 }
948
949 pub fn register_engine(&self, engine: RegionEngineRef) {
950 let engine_name = engine.name();
951 if engine_name == MITO_ENGINE_NAME
952 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
953 {
954 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
955 }
956
957 info!("Region Engine {engine_name} is registered");
958 self.engines
959 .write()
960 .unwrap()
961 .insert(engine_name.to_string(), engine);
962 }
963
964 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
965 info!("Set topic stats reporter");
966 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
967 }
968
969 fn get_engine(
970 &self,
971 region_id: RegionId,
972 region_change: &RegionChange,
973 ) -> Result<CurrentEngine> {
974 let current_region_status = self.region_map.get(®ion_id);
975
976 let engine = match region_change {
977 RegionChange::Register(attribute) => match current_region_status {
978 Some(status) => match status.clone() {
979 RegionEngineWithStatus::Registering(engine) => engine,
980 RegionEngineWithStatus::Deregistering(_) => {
981 return error::RegionBusySnafu { region_id }.fail();
982 }
983 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
984 },
985 _ => self
986 .engines
987 .read()
988 .unwrap()
989 .get(attribute.engine())
990 .with_context(|| RegionEngineNotFoundSnafu {
991 name: attribute.engine(),
992 })?
993 .clone(),
994 },
995 RegionChange::Deregisters => match current_region_status {
996 Some(status) => match status.clone() {
997 RegionEngineWithStatus::Registering(_) => {
998 return error::RegionBusySnafu { region_id }.fail();
999 }
1000 RegionEngineWithStatus::Deregistering(_) => {
1001 return Ok(CurrentEngine::EarlyReturn(0));
1002 }
1003 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1004 },
1005 None => return Ok(CurrentEngine::EarlyReturn(0)),
1006 },
1007 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1008 match current_region_status {
1009 Some(status) => match status.clone() {
1010 RegionEngineWithStatus::Registering(_) => {
1011 return error::RegionNotReadySnafu { region_id }.fail();
1012 }
1013 RegionEngineWithStatus::Deregistering(_) => {
1014 return error::RegionNotFoundSnafu { region_id }.fail();
1015 }
1016 RegionEngineWithStatus::Ready(engine) => engine,
1017 },
1018 None => return error::RegionNotFoundSnafu { region_id }.fail(),
1019 }
1020 }
1021 };
1022
1023 Ok(CurrentEngine::Engine(engine))
1024 }
1025
1026 async fn handle_batch_open_requests_inner(
1027 &self,
1028 engine: RegionEngineRef,
1029 parallelism: usize,
1030 requests: Vec<(RegionId, RegionOpenRequest)>,
1031 ignore_nonexistent_region: bool,
1032 ) -> Result<Vec<RegionId>> {
1033 let region_changes = requests
1034 .iter()
1035 .map(|(region_id, open)| {
1036 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1037 Ok((*region_id, RegionChange::Register(attribute)))
1038 })
1039 .collect::<Result<HashMap<_, _>>>()?;
1040
1041 for (®ion_id, region_change) in ®ion_changes {
1042 self.set_region_status_not_ready(region_id, &engine, region_change)
1043 }
1044
1045 let mut open_regions = Vec::with_capacity(requests.len());
1046 let mut errors = vec![];
1047 match engine
1048 .handle_batch_open_requests(parallelism, requests)
1049 .await
1050 .with_context(|_| HandleBatchOpenRequestSnafu)
1051 {
1052 Ok(results) => {
1053 for (region_id, result) in results {
1054 let region_change = ®ion_changes[®ion_id];
1055 match result {
1056 Ok(_) => {
1057 if let Err(e) = self
1058 .set_region_status_ready(region_id, engine.clone(), *region_change)
1059 .await
1060 {
1061 error!(e; "Failed to set region to ready: {}", region_id);
1062 errors.push(BoxedError::new(e));
1063 } else {
1064 open_regions.push(region_id)
1065 }
1066 }
1067 Err(e) => {
1068 self.unset_region_status(region_id, &engine, *region_change);
1069 if e.status_code() == StatusCode::RegionNotFound
1070 && ignore_nonexistent_region
1071 {
1072 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1073 } else {
1074 error!(e; "Failed to open region: {}", region_id);
1075 errors.push(e);
1076 }
1077 }
1078 }
1079 }
1080 }
1081 Err(e) => {
1082 for (®ion_id, region_change) in ®ion_changes {
1083 self.unset_region_status(region_id, &engine, *region_change);
1084 }
1085 error!(e; "Failed to open batch regions");
1086 errors.push(BoxedError::new(e));
1087 }
1088 }
1089
1090 if !errors.is_empty() {
1091 return error::UnexpectedSnafu {
1092 violated: format!("Failed to open batch regions: {:?}", errors[0]),
1094 }
1095 .fail();
1096 }
1097
1098 Ok(open_regions)
1099 }
1100
1101 pub async fn handle_batch_open_requests(
1102 &self,
1103 parallelism: usize,
1104 requests: Vec<(RegionId, RegionOpenRequest)>,
1105 ignore_nonexistent_region: bool,
1106 ) -> Result<Vec<RegionId>> {
1107 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1108 HashMap::with_capacity(requests.len());
1109 for (region_id, request) in requests {
1110 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1111 requests.push((region_id, request));
1112 } else {
1113 engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1114 }
1115 }
1116
1117 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1118 for (engine, requests) in engine_grouped_requests {
1119 let engine = self
1120 .engines
1121 .read()
1122 .unwrap()
1123 .get(&engine)
1124 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1125 .clone();
1126 results.push(
1127 self.handle_batch_open_requests_inner(
1128 engine,
1129 parallelism,
1130 requests,
1131 ignore_nonexistent_region,
1132 )
1133 .await,
1134 )
1135 }
1136
1137 Ok(results
1138 .into_iter()
1139 .collect::<Result<Vec<_>>>()?
1140 .into_iter()
1141 .flatten()
1142 .collect::<Vec<_>>())
1143 }
1144
1145 pub async fn handle_batch_catchup_requests_inner(
1146 &self,
1147 engine: RegionEngineRef,
1148 parallelism: usize,
1149 requests: Vec<(RegionId, RegionCatchupRequest)>,
1150 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1151 for (region_id, _) in &requests {
1152 self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1153 }
1154 let region_ids = requests
1155 .iter()
1156 .map(|(region_id, _)| *region_id)
1157 .collect::<Vec<_>>();
1158 let mut responses = Vec::with_capacity(requests.len());
1159 match engine
1160 .handle_batch_catchup_requests(parallelism, requests)
1161 .await
1162 {
1163 Ok(results) => {
1164 for (region_id, result) in results {
1165 match result {
1166 Ok(_) => {
1167 if let Err(e) = self
1168 .set_region_status_ready(
1169 region_id,
1170 engine.clone(),
1171 RegionChange::Catchup,
1172 )
1173 .await
1174 {
1175 error!(e; "Failed to set region to ready: {}", region_id);
1176 responses.push((region_id, Err(BoxedError::new(e))));
1177 } else {
1178 responses.push((region_id, Ok(())));
1179 }
1180 }
1181 Err(e) => {
1182 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1183 error!(e; "Failed to catchup region: {}", region_id);
1184 responses.push((region_id, Err(e)));
1185 }
1186 }
1187 }
1188 }
1189 Err(e) => {
1190 for region_id in region_ids {
1191 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1192 }
1193 error!(e; "Failed to catchup batch regions");
1194 return error::UnexpectedSnafu {
1195 violated: format!("Failed to catchup batch regions: {:?}", e),
1196 }
1197 .fail();
1198 }
1199 }
1200
1201 Ok(responses)
1202 }
1203
1204 pub async fn handle_batch_catchup_requests(
1205 &self,
1206 parallelism: usize,
1207 requests: Vec<(RegionId, RegionCatchupRequest)>,
1208 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1209 let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1210
1211 let mut responses = Vec::with_capacity(requests.len());
1212 for (region_id, request) in requests {
1213 if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1214 match engine {
1215 CurrentEngine::Engine(engine) => {
1216 engine_grouped_requests
1217 .entry(engine.name().to_string())
1218 .or_default()
1219 .push((region_id, request));
1220 }
1221 CurrentEngine::EarlyReturn(_) => {
1222 return error::UnexpectedSnafu {
1223 violated: format!("Unexpected engine type for region {}", region_id),
1224 }
1225 .fail();
1226 }
1227 }
1228 } else {
1229 responses.push((
1230 region_id,
1231 Err(BoxedError::new(
1232 error::RegionNotFoundSnafu { region_id }.build(),
1233 )),
1234 ));
1235 }
1236 }
1237
1238 for (engine, requests) in engine_grouped_requests {
1239 let engine = self
1240 .engines
1241 .read()
1242 .unwrap()
1243 .get(&engine)
1244 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1245 .clone();
1246 responses.extend(
1247 self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1248 .await?,
1249 );
1250 }
1251
1252 Ok(responses)
1253 }
1254
1255 pub async fn handle_batch_request(
1259 &self,
1260 batch_request: BatchRegionDdlRequest,
1261 ) -> Result<RegionResponse> {
1262 let region_changes = match &batch_request {
1263 BatchRegionDdlRequest::Create(requests) => requests
1264 .iter()
1265 .map(|(region_id, create)| {
1266 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1267 Ok((*region_id, RegionChange::Register(attribute)))
1268 })
1269 .collect::<Result<Vec<_>>>()?,
1270 BatchRegionDdlRequest::Drop(requests) => requests
1271 .iter()
1272 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1273 .collect::<Vec<_>>(),
1274 BatchRegionDdlRequest::Alter(requests) => requests
1275 .iter()
1276 .map(|(region_id, _)| (*region_id, RegionChange::None))
1277 .collect::<Vec<_>>(),
1278 };
1279
1280 let (first_region_id, first_region_change) = region_changes.first().unwrap();
1283 let engine = match self.get_engine(*first_region_id, first_region_change)? {
1284 CurrentEngine::Engine(engine) => engine,
1285 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1286 };
1287
1288 for (region_id, region_change) in region_changes.iter() {
1289 self.set_region_status_not_ready(*region_id, &engine, region_change);
1290 }
1291
1292 let ddl_type = batch_request.request_type();
1293 let result = engine
1294 .handle_batch_ddl_requests(batch_request)
1295 .await
1296 .context(HandleBatchDdlRequestSnafu { ddl_type });
1297
1298 match result {
1299 Ok(result) => {
1300 for (region_id, region_change) in ®ion_changes {
1301 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1302 .await?;
1303 }
1304
1305 Ok(RegionResponse {
1306 affected_rows: result.affected_rows,
1307 extensions: result.extensions,
1308 metadata: Vec::new(),
1309 })
1310 }
1311 Err(err) => {
1312 for (region_id, region_change) in region_changes {
1313 self.unset_region_status(region_id, &engine, region_change);
1314 }
1315
1316 Err(err)
1317 }
1318 }
1319 }
1320
1321 pub async fn handle_request(
1322 &self,
1323 region_id: RegionId,
1324 request: RegionRequest,
1325 ) -> Result<RegionResponse> {
1326 let request_type = request.request_type();
1327 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1328 .with_label_values(&[request_type])
1329 .start_timer();
1330
1331 let region_change = match &request {
1332 RegionRequest::Create(create) => {
1333 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1334 RegionChange::Register(attribute)
1335 }
1336 RegionRequest::Open(open) => {
1337 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1338 RegionChange::Register(attribute)
1339 }
1340 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1341 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1342 RegionChange::Ingest
1343 }
1344 RegionRequest::Alter(_)
1345 | RegionRequest::Flush(_)
1346 | RegionRequest::Compact(_)
1347 | RegionRequest::Truncate(_)
1348 | RegionRequest::BuildIndex(_)
1349 | RegionRequest::EnterStaging(_)
1350 | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1351 RegionRequest::Catchup(_) => RegionChange::Catchup,
1352 };
1353
1354 let engine = match self.get_engine(region_id, ®ion_change)? {
1355 CurrentEngine::Engine(engine) => engine,
1356 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1357 };
1358
1359 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1361
1362 match engine
1363 .handle_request(region_id, request)
1364 .await
1365 .with_context(|_| HandleRegionRequestSnafu { region_id })
1366 {
1367 Ok(result) => {
1368 if matches!(region_change, RegionChange::Ingest) {
1370 crate::metrics::REGION_CHANGED_ROW_COUNT
1371 .with_label_values(&[request_type])
1372 .inc_by(result.affected_rows as u64);
1373 }
1374 self.set_region_status_ready(region_id, engine.clone(), region_change)
1376 .await?;
1377
1378 Ok(RegionResponse {
1379 affected_rows: result.affected_rows,
1380 extensions: result.extensions,
1381 metadata: Vec::new(),
1382 })
1383 }
1384 Err(err) => {
1385 if matches!(region_change, RegionChange::Ingest) {
1386 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1387 .with_label_values(&[request_type])
1388 .inc();
1389 }
1390 self.unset_region_status(region_id, &engine, region_change);
1392 Err(err)
1393 }
1394 }
1395 }
1396
1397 pub async fn handle_sync_region(
1399 &self,
1400 engine: &RegionEngineRef,
1401 region_id: RegionId,
1402 request: SyncRegionFromRequest,
1403 ) -> Result<()> {
1404 let Some(new_opened_regions) = engine
1405 .sync_region(region_id, request)
1406 .await
1407 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1408 .new_opened_logical_region_ids()
1409 else {
1410 return Ok(());
1411 };
1412
1413 for region in &new_opened_regions {
1414 self.region_map
1415 .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1416 }
1417 if !new_opened_regions.is_empty() {
1418 info!(
1419 region_id = %region_id,
1420 logical_region_count = new_opened_regions.len(),
1421 logical_regions = ?new_opened_regions,
1422 "Logical regions are registered"
1423 );
1424 }
1425
1426 Ok(())
1427 }
1428
1429 fn set_region_status_not_ready(
1430 &self,
1431 region_id: RegionId,
1432 engine: &RegionEngineRef,
1433 region_change: &RegionChange,
1434 ) {
1435 match region_change {
1436 RegionChange::Register(_) => {
1437 self.region_map.insert(
1438 region_id,
1439 RegionEngineWithStatus::Registering(engine.clone()),
1440 );
1441 }
1442 RegionChange::Deregisters => {
1443 self.region_map.insert(
1444 region_id,
1445 RegionEngineWithStatus::Deregistering(engine.clone()),
1446 );
1447 }
1448 _ => {}
1449 }
1450 }
1451
1452 fn unset_region_status(
1453 &self,
1454 region_id: RegionId,
1455 engine: &RegionEngineRef,
1456 region_change: RegionChange,
1457 ) {
1458 match region_change {
1459 RegionChange::None | RegionChange::Ingest => {}
1460 RegionChange::Register(_) => {
1461 self.region_map.remove(®ion_id);
1462 }
1463 RegionChange::Deregisters => {
1464 self.region_map
1465 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1466 }
1467 RegionChange::Catchup => {}
1468 }
1469 }
1470
1471 async fn set_region_status_ready(
1472 &self,
1473 region_id: RegionId,
1474 engine: RegionEngineRef,
1475 region_change: RegionChange,
1476 ) -> Result<()> {
1477 let engine_type = engine.name();
1478 match region_change {
1479 RegionChange::None | RegionChange::Ingest => {}
1480 RegionChange::Register(attribute) => {
1481 info!(
1482 "Region {region_id} is registered to engine {}",
1483 attribute.engine()
1484 );
1485 self.region_map
1486 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1487
1488 match attribute {
1489 RegionAttribute::Metric { physical } => {
1490 if physical {
1491 self.register_logical_regions(&engine, region_id).await?;
1493 self.event_listener.on_region_registered(region_id);
1495 }
1496 }
1497 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1498 RegionAttribute::File => {
1499 }
1501 }
1502 }
1503 RegionChange::Deregisters => {
1504 info!("Region {region_id} is deregistered from engine {engine_type}");
1505 self.region_map
1506 .remove(®ion_id)
1507 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1508 self.event_listener.on_region_deregistered(region_id);
1509 }
1510 RegionChange::Catchup => {
1511 if is_metric_engine(engine.name()) {
1512 self.register_logical_regions(&engine, region_id).await?;
1514 }
1515 }
1516 }
1517 Ok(())
1518 }
1519
1520 async fn register_logical_regions(
1521 &self,
1522 engine: &RegionEngineRef,
1523 physical_region_id: RegionId,
1524 ) -> Result<()> {
1525 let metric_engine =
1526 engine
1527 .as_any()
1528 .downcast_ref::<MetricEngine>()
1529 .context(UnexpectedSnafu {
1530 violated: format!(
1531 "expecting engine type '{}', actual '{}'",
1532 METRIC_ENGINE_NAME,
1533 engine.name(),
1534 ),
1535 })?;
1536
1537 let logical_regions = metric_engine
1538 .logical_regions(physical_region_id)
1539 .await
1540 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1541
1542 for region in &logical_regions {
1543 self.region_map
1544 .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1545 }
1546 if !logical_regions.is_empty() {
1547 info!(
1548 physical_region_id = %physical_region_id,
1549 logical_region_count = logical_regions.len(),
1550 logical_regions = ?logical_regions,
1551 "Logical regions are registered"
1552 );
1553 }
1554 Ok(())
1555 }
1556
1557 pub async fn handle_read(
1558 &self,
1559 request: QueryRequest,
1560 query_ctx: QueryContextRef,
1561 ) -> Result<SendableRecordBatchStream> {
1562 let result = self
1565 .query_engine
1566 .execute(request.plan, query_ctx)
1567 .await
1568 .context(ExecuteLogicalPlanSnafu)?;
1569
1570 match result.data {
1571 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1572 UnsupportedOutputSnafu { expected: "stream" }.fail()
1573 }
1574 OutputData::Stream(stream) => Ok(stream),
1575 }
1576 }
1577
1578 async fn stop(&self) -> Result<()> {
1579 let regions = self
1587 .region_map
1588 .iter()
1589 .map(|x| (*x.key(), x.value().clone()))
1590 .collect::<Vec<_>>();
1591 let num_regions = regions.len();
1592
1593 for (region_id, engine) in regions {
1594 let closed = engine
1595 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1596 .await;
1597 match closed {
1598 Ok(_) => debug!("Region {region_id} is closed"),
1599 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1600 }
1601 }
1602 self.region_map.clear();
1603 info!("closed {num_regions} regions");
1604
1605 drop(self.mito_engine.write().unwrap().take());
1606 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1607 for (engine_name, engine) in engines {
1608 engine
1609 .stop()
1610 .await
1611 .context(StopRegionEngineSnafu { name: &engine_name })?;
1612 info!("Region engine {engine_name} is stopped");
1613 }
1614
1615 Ok(())
1616 }
1617}
1618
1619#[derive(Debug, Clone, Copy)]
1620enum RegionChange {
1621 None,
1622 Register(RegionAttribute),
1623 Deregisters,
1624 Catchup,
1625 Ingest,
1626}
1627
1628fn is_metric_engine(engine: &str) -> bool {
1629 engine == METRIC_ENGINE_NAME
1630}
1631
1632fn parse_region_attribute(
1633 engine: &str,
1634 options: &HashMap<String, String>,
1635) -> Result<RegionAttribute> {
1636 match engine {
1637 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1638 METRIC_ENGINE_NAME => {
1639 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1640
1641 Ok(RegionAttribute::Metric { physical })
1642 }
1643 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1644 _ => error::UnexpectedSnafu {
1645 violated: format!("Unknown engine: {}", engine),
1646 }
1647 .fail(),
1648 }
1649}
1650
1651#[derive(Debug, Clone, Copy)]
1652enum RegionAttribute {
1653 Mito,
1654 Metric { physical: bool },
1655 File,
1656}
1657
1658impl RegionAttribute {
1659 fn engine(&self) -> &'static str {
1660 match self {
1661 RegionAttribute::Mito => MITO_ENGINE_NAME,
1662 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1663 RegionAttribute::File => FILE_ENGINE_NAME,
1664 }
1665 }
1666}
1667
1668#[cfg(test)]
1669mod tests {
1670
1671 use std::assert_matches;
1672
1673 use api::v1::SemanticType;
1674 use common_error::ext::ErrorExt;
1675 use datatypes::prelude::ConcreteDataType;
1676 use mito2::test_util::CreateRequestBuilder;
1677 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1678 use store_api::region_engine::RegionEngine;
1679 use store_api::region_request::{
1680 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1681 };
1682 use store_api::storage::RegionId;
1683
1684 use super::*;
1685 use crate::error::Result;
1686 use crate::tests::{MockRegionEngine, mock_region_server};
1687
1688 #[tokio::test]
1689 async fn test_region_registering() {
1690 common_telemetry::init_default_ut_logging();
1691
1692 let mut mock_region_server = mock_region_server();
1693 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1694 let engine_name = engine.name();
1695 mock_region_server.register_engine(engine.clone());
1696 let region_id = RegionId::new(1, 1);
1697 let builder = CreateRequestBuilder::new();
1698 let create_req = builder.build();
1699 mock_region_server.inner.region_map.insert(
1701 region_id,
1702 RegionEngineWithStatus::Registering(engine.clone()),
1703 );
1704 let response = mock_region_server
1705 .handle_request(region_id, RegionRequest::Create(create_req))
1706 .await
1707 .unwrap();
1708 assert_eq!(response.affected_rows, 0);
1709 let status = mock_region_server
1710 .inner
1711 .region_map
1712 .get(®ion_id)
1713 .unwrap()
1714 .clone();
1715 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1716
1717 mock_region_server.inner.region_map.insert(
1718 region_id,
1719 RegionEngineWithStatus::Registering(engine.clone()),
1720 );
1721 let response = mock_region_server
1722 .handle_request(
1723 region_id,
1724 RegionRequest::Open(RegionOpenRequest {
1725 engine: engine_name.to_string(),
1726 table_dir: String::new(),
1727 path_type: PathType::Bare,
1728 options: Default::default(),
1729 skip_wal_replay: false,
1730 checkpoint: None,
1731 }),
1732 )
1733 .await
1734 .unwrap();
1735 assert_eq!(response.affected_rows, 0);
1736 let status = mock_region_server
1737 .inner
1738 .region_map
1739 .get(®ion_id)
1740 .unwrap()
1741 .clone();
1742 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1743 }
1744
1745 #[tokio::test]
1746 async fn test_region_deregistering() {
1747 common_telemetry::init_default_ut_logging();
1748
1749 let mut mock_region_server = mock_region_server();
1750 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1751
1752 mock_region_server.register_engine(engine.clone());
1753
1754 let region_id = RegionId::new(1, 1);
1755
1756 mock_region_server.inner.region_map.insert(
1758 region_id,
1759 RegionEngineWithStatus::Deregistering(engine.clone()),
1760 );
1761
1762 let response = mock_region_server
1763 .handle_request(
1764 region_id,
1765 RegionRequest::Drop(RegionDropRequest {
1766 fast_path: false,
1767 force: false,
1768 partial_drop: false,
1769 }),
1770 )
1771 .await
1772 .unwrap();
1773 assert_eq!(response.affected_rows, 0);
1774
1775 let status = mock_region_server
1776 .inner
1777 .region_map
1778 .get(®ion_id)
1779 .unwrap()
1780 .clone();
1781 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1782
1783 mock_region_server.inner.region_map.insert(
1784 region_id,
1785 RegionEngineWithStatus::Deregistering(engine.clone()),
1786 );
1787
1788 let response = mock_region_server
1789 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1790 .await
1791 .unwrap();
1792 assert_eq!(response.affected_rows, 0);
1793
1794 let status = mock_region_server
1795 .inner
1796 .region_map
1797 .get(®ion_id)
1798 .unwrap()
1799 .clone();
1800 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1801 }
1802
1803 #[tokio::test]
1804 async fn test_region_not_ready() {
1805 common_telemetry::init_default_ut_logging();
1806
1807 let mut mock_region_server = mock_region_server();
1808 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1809
1810 mock_region_server.register_engine(engine.clone());
1811
1812 let region_id = RegionId::new(1, 1);
1813
1814 mock_region_server.inner.region_map.insert(
1816 region_id,
1817 RegionEngineWithStatus::Registering(engine.clone()),
1818 );
1819
1820 let err = mock_region_server
1821 .handle_request(
1822 region_id,
1823 RegionRequest::Truncate(RegionTruncateRequest::All),
1824 )
1825 .await
1826 .unwrap_err();
1827
1828 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1829 }
1830
1831 #[tokio::test]
1832 async fn test_region_request_failed() {
1833 common_telemetry::init_default_ut_logging();
1834
1835 let mut mock_region_server = mock_region_server();
1836 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1837 MITO_ENGINE_NAME,
1838 Box::new(|_region_id, _request| {
1839 error::UnexpectedSnafu {
1840 violated: "test".to_string(),
1841 }
1842 .fail()
1843 }),
1844 );
1845
1846 mock_region_server.register_engine(engine.clone());
1847
1848 let region_id = RegionId::new(1, 1);
1849 let builder = CreateRequestBuilder::new();
1850 let create_req = builder.build();
1851 mock_region_server
1852 .handle_request(region_id, RegionRequest::Create(create_req))
1853 .await
1854 .unwrap_err();
1855
1856 let status = mock_region_server.inner.region_map.get(®ion_id);
1857 assert!(status.is_none());
1858
1859 mock_region_server
1860 .inner
1861 .region_map
1862 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1863
1864 mock_region_server
1865 .handle_request(
1866 region_id,
1867 RegionRequest::Drop(RegionDropRequest {
1868 fast_path: false,
1869 force: false,
1870 partial_drop: false,
1871 }),
1872 )
1873 .await
1874 .unwrap_err();
1875
1876 let status = mock_region_server.inner.region_map.get(®ion_id);
1877 assert!(status.is_some());
1878 }
1879
1880 #[tokio::test]
1881 async fn test_batch_open_region_ignore_nonexistent_regions() {
1882 common_telemetry::init_default_ut_logging();
1883 let mut mock_region_server = mock_region_server();
1884 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1885 MITO_ENGINE_NAME,
1886 Box::new(|region_id, _request| {
1887 if region_id == RegionId::new(1, 1) {
1888 error::RegionNotFoundSnafu { region_id }.fail()
1889 } else {
1890 Ok(0)
1891 }
1892 }),
1893 );
1894 mock_region_server.register_engine(engine.clone());
1895
1896 let region_ids = mock_region_server
1897 .handle_batch_open_requests(
1898 8,
1899 vec![
1900 (
1901 RegionId::new(1, 1),
1902 RegionOpenRequest {
1903 engine: MITO_ENGINE_NAME.to_string(),
1904 table_dir: String::new(),
1905 path_type: PathType::Bare,
1906 options: Default::default(),
1907 skip_wal_replay: false,
1908 checkpoint: None,
1909 },
1910 ),
1911 (
1912 RegionId::new(1, 2),
1913 RegionOpenRequest {
1914 engine: MITO_ENGINE_NAME.to_string(),
1915 table_dir: String::new(),
1916 path_type: PathType::Bare,
1917 options: Default::default(),
1918 skip_wal_replay: false,
1919 checkpoint: None,
1920 },
1921 ),
1922 ],
1923 true,
1924 )
1925 .await
1926 .unwrap();
1927 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1928
1929 let err = mock_region_server
1930 .handle_batch_open_requests(
1931 8,
1932 vec![
1933 (
1934 RegionId::new(1, 1),
1935 RegionOpenRequest {
1936 engine: MITO_ENGINE_NAME.to_string(),
1937 table_dir: String::new(),
1938 path_type: PathType::Bare,
1939 options: Default::default(),
1940 skip_wal_replay: false,
1941 checkpoint: None,
1942 },
1943 ),
1944 (
1945 RegionId::new(1, 2),
1946 RegionOpenRequest {
1947 engine: MITO_ENGINE_NAME.to_string(),
1948 table_dir: String::new(),
1949 path_type: PathType::Bare,
1950 options: Default::default(),
1951 skip_wal_replay: false,
1952 checkpoint: None,
1953 },
1954 ),
1955 ],
1956 false,
1957 )
1958 .await
1959 .unwrap_err();
1960 assert_eq!(err.status_code(), StatusCode::Unexpected);
1961 }
1962
1963 struct CurrentEngineTest {
1964 region_id: RegionId,
1965 current_region_status: Option<RegionEngineWithStatus>,
1966 region_change: RegionChange,
1967 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1968 }
1969
1970 #[tokio::test]
1971 async fn test_current_engine() {
1972 common_telemetry::init_default_ut_logging();
1973
1974 let mut mock_region_server = mock_region_server();
1975 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1976 mock_region_server.register_engine(engine.clone());
1977
1978 let region_id = RegionId::new(1024, 1);
1979 let tests = vec![
1980 CurrentEngineTest {
1982 region_id,
1983 current_region_status: None,
1984 region_change: RegionChange::None,
1985 assert: Box::new(|result| {
1986 let err = result.unwrap_err();
1987 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1988 }),
1989 },
1990 CurrentEngineTest {
1991 region_id,
1992 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1993 region_change: RegionChange::None,
1994 assert: Box::new(|result| {
1995 let current_engine = result.unwrap();
1996 assert_matches!(current_engine, CurrentEngine::Engine(_));
1997 }),
1998 },
1999 CurrentEngineTest {
2000 region_id,
2001 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2002 region_change: RegionChange::None,
2003 assert: Box::new(|result| {
2004 let err = result.unwrap_err();
2005 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2006 }),
2007 },
2008 CurrentEngineTest {
2009 region_id,
2010 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2011 region_change: RegionChange::None,
2012 assert: Box::new(|result| {
2013 let err = result.unwrap_err();
2014 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2015 }),
2016 },
2017 CurrentEngineTest {
2019 region_id,
2020 current_region_status: None,
2021 region_change: RegionChange::Register(RegionAttribute::Mito),
2022 assert: Box::new(|result| {
2023 let current_engine = result.unwrap();
2024 assert_matches!(current_engine, CurrentEngine::Engine(_));
2025 }),
2026 },
2027 CurrentEngineTest {
2028 region_id,
2029 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2030 region_change: RegionChange::Register(RegionAttribute::Mito),
2031 assert: Box::new(|result| {
2032 let current_engine = result.unwrap();
2033 assert_matches!(current_engine, CurrentEngine::Engine(_));
2034 }),
2035 },
2036 CurrentEngineTest {
2037 region_id,
2038 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2039 region_change: RegionChange::Register(RegionAttribute::Mito),
2040 assert: Box::new(|result| {
2041 let err = result.unwrap_err();
2042 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2043 }),
2044 },
2045 CurrentEngineTest {
2046 region_id,
2047 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2048 region_change: RegionChange::Register(RegionAttribute::Mito),
2049 assert: Box::new(|result| {
2050 let current_engine = result.unwrap();
2051 assert_matches!(current_engine, CurrentEngine::Engine(_));
2052 }),
2053 },
2054 CurrentEngineTest {
2056 region_id,
2057 current_region_status: None,
2058 region_change: RegionChange::Deregisters,
2059 assert: Box::new(|result| {
2060 let current_engine = result.unwrap();
2061 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2062 }),
2063 },
2064 CurrentEngineTest {
2065 region_id,
2066 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2067 region_change: RegionChange::Deregisters,
2068 assert: Box::new(|result| {
2069 let err = result.unwrap_err();
2070 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2071 }),
2072 },
2073 CurrentEngineTest {
2074 region_id,
2075 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2076 region_change: RegionChange::Deregisters,
2077 assert: Box::new(|result| {
2078 let current_engine = result.unwrap();
2079 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2080 }),
2081 },
2082 CurrentEngineTest {
2083 region_id,
2084 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2085 region_change: RegionChange::Deregisters,
2086 assert: Box::new(|result| {
2087 let current_engine = result.unwrap();
2088 assert_matches!(current_engine, CurrentEngine::Engine(_));
2089 }),
2090 },
2091 ];
2092
2093 for test in tests {
2094 let CurrentEngineTest {
2095 region_id,
2096 current_region_status,
2097 region_change,
2098 assert,
2099 } = test;
2100
2101 if let Some(status) = current_region_status {
2103 mock_region_server
2104 .inner
2105 .region_map
2106 .insert(region_id, status);
2107 } else {
2108 mock_region_server.inner.region_map.remove(®ion_id);
2109 }
2110
2111 let result = mock_region_server
2112 .inner
2113 .get_engine(region_id, ®ion_change);
2114
2115 assert(result);
2116 }
2117 }
2118
2119 #[tokio::test]
2120 async fn test_region_server_parallelism() {
2121 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2122 let first_query = p.acquire().await;
2123 assert!(first_query.is_ok());
2124 let second_query = p.acquire().await;
2125 assert!(second_query.is_ok());
2126 let third_query = p.acquire().await;
2127 assert!(third_query.is_err());
2128 let err = third_query.unwrap_err();
2129 assert_eq!(
2130 err.output_msg(),
2131 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2132 );
2133 drop(first_query);
2134 let forth_query = p.acquire().await;
2135 assert!(forth_query.is_ok());
2136 }
2137
2138 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2139 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2140 metadata_builder.push_column_metadata(ColumnMetadata {
2141 column_schema: datatypes::schema::ColumnSchema::new(
2142 "timestamp",
2143 ConcreteDataType::timestamp_nanosecond_datatype(),
2144 false,
2145 ),
2146 semantic_type: SemanticType::Timestamp,
2147 column_id: 0,
2148 });
2149 metadata_builder.push_column_metadata(ColumnMetadata {
2150 column_schema: datatypes::schema::ColumnSchema::new(
2151 "file",
2152 ConcreteDataType::string_datatype(),
2153 true,
2154 ),
2155 semantic_type: SemanticType::Tag,
2156 column_id: 1,
2157 });
2158 metadata_builder.push_column_metadata(ColumnMetadata {
2159 column_schema: datatypes::schema::ColumnSchema::new(
2160 "message",
2161 ConcreteDataType::string_datatype(),
2162 true,
2163 ),
2164 semantic_type: SemanticType::Field,
2165 column_id: 2,
2166 });
2167 metadata_builder.primary_key(vec![1]);
2168 metadata_builder.build().unwrap()
2169 }
2170
2171 #[tokio::test]
2172 async fn test_handle_list_metadata_request() {
2173 common_telemetry::init_default_ut_logging();
2174
2175 let mut mock_region_server = mock_region_server();
2176 let region_id_1 = RegionId::new(1, 0);
2177 let region_id_2 = RegionId::new(2, 0);
2178
2179 let metadata_1 = mock_region_metadata(region_id_1);
2180 let metadata_2 = mock_region_metadata(region_id_2);
2181 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2182
2183 let metadata_1 = Arc::new(metadata_1);
2184 let metadata_2 = Arc::new(metadata_2);
2185 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2186 MITO_ENGINE_NAME,
2187 Box::new(move |region_id| {
2188 if region_id == region_id_1 {
2189 Ok(metadata_1.clone())
2190 } else if region_id == region_id_2 {
2191 Ok(metadata_2.clone())
2192 } else {
2193 error::RegionNotFoundSnafu { region_id }.fail()
2194 }
2195 }),
2196 );
2197
2198 mock_region_server.register_engine(engine.clone());
2199 mock_region_server
2200 .inner
2201 .region_map
2202 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2203 mock_region_server
2204 .inner
2205 .region_map
2206 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2207
2208 let list_metadata_request = ListMetadataRequest {
2210 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2211 };
2212 let response = mock_region_server
2213 .handle_list_metadata_request(&list_metadata_request)
2214 .await
2215 .unwrap();
2216 let decoded_metadata: Vec<Option<RegionMetadata>> =
2217 serde_json::from_slice(&response.metadata).unwrap();
2218 assert_eq!(metadatas, decoded_metadata);
2219 }
2220
2221 #[tokio::test]
2222 async fn test_handle_list_metadata_not_found() {
2223 common_telemetry::init_default_ut_logging();
2224
2225 let mut mock_region_server = mock_region_server();
2226 let region_id_1 = RegionId::new(1, 0);
2227 let region_id_2 = RegionId::new(2, 0);
2228
2229 let metadata_1 = mock_region_metadata(region_id_1);
2230 let metadatas = vec![Some(metadata_1.clone()), None];
2231
2232 let metadata_1 = Arc::new(metadata_1);
2233 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2234 MITO_ENGINE_NAME,
2235 Box::new(move |region_id| {
2236 if region_id == region_id_1 {
2237 Ok(metadata_1.clone())
2238 } else {
2239 error::RegionNotFoundSnafu { region_id }.fail()
2240 }
2241 }),
2242 );
2243
2244 mock_region_server.register_engine(engine.clone());
2245 mock_region_server
2246 .inner
2247 .region_map
2248 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2249
2250 let list_metadata_request = ListMetadataRequest {
2252 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2253 };
2254 let response = mock_region_server
2255 .handle_list_metadata_request(&list_metadata_request)
2256 .await
2257 .unwrap();
2258 let decoded_metadata: Vec<Option<RegionMetadata>> =
2259 serde_json::from_slice(&response.metadata).unwrap();
2260 assert_eq!(metadatas, decoded_metadata);
2261
2262 mock_region_server
2264 .inner
2265 .region_map
2266 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2267 let response = mock_region_server
2268 .handle_list_metadata_request(&list_metadata_request)
2269 .await
2270 .unwrap();
2271 let decoded_metadata: Vec<Option<RegionMetadata>> =
2272 serde_json::from_slice(&response.metadata).unwrap();
2273 assert_eq!(metadatas, decoded_metadata);
2274 }
2275
2276 #[tokio::test]
2277 async fn test_handle_list_metadata_failed() {
2278 common_telemetry::init_default_ut_logging();
2279
2280 let mut mock_region_server = mock_region_server();
2281 let region_id_1 = RegionId::new(1, 0);
2282
2283 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2284 MITO_ENGINE_NAME,
2285 Box::new(move |region_id| {
2286 error::UnexpectedSnafu {
2287 violated: format!("Failed to get region {region_id}"),
2288 }
2289 .fail()
2290 }),
2291 );
2292
2293 mock_region_server.register_engine(engine.clone());
2294 mock_region_server
2295 .inner
2296 .region_map
2297 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2298
2299 let list_metadata_request = ListMetadataRequest {
2301 region_ids: vec![region_id_1.as_u64()],
2302 };
2303 mock_region_server
2304 .handle_list_metadata_request(&list_metadata_request)
2305 .await
2306 .unwrap_err();
2307 }
2308}