1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use api::region::RegionResponse;
22use api::v1::region::sync_request::ManifestInfo;
23use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
24use api::v1::{ResponseHeader, Status};
25use arrow_flight::{FlightData, Ticket};
26use async_trait::async_trait;
27use bytes::Bytes;
28use common_error::ext::BoxedError;
29use common_error::status_code::StatusCode;
30use common_query::request::QueryRequest;
31use common_query::OutputData;
32use common_recordbatch::SendableRecordBatchStream;
33use common_runtime::Runtime;
34use common_telemetry::tracing::{self, info_span};
35use common_telemetry::tracing_context::{FutureExt, TracingContext};
36use common_telemetry::{debug, error, info, warn};
37use dashmap::DashMap;
38use datafusion::datasource::{provider_as_source, TableProvider};
39use datafusion::error::Result as DfResult;
40use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
41use datafusion_expr::{LogicalPlan, TableSource};
42use futures_util::future::try_join_all;
43use metric_engine::engine::MetricEngine;
44use mito2::engine::MITO_ENGINE_NAME;
45use prost::Message;
46pub use query::dummy_catalog::{
47 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
48};
49use query::QueryEngineRef;
50use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
51use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
52use servers::grpc::region_server::RegionServerHandler;
53use session::context::{QueryContextBuilder, QueryContextRef};
54use snafu::{ensure, OptionExt, ResultExt};
55use store_api::metric_engine_consts::{
56 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
57};
58use store_api::region_engine::{
59 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
60 SettableRegionRoleState,
61};
62use store_api::region_request::{
63 AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
64};
65use store_api::storage::RegionId;
66use tokio::sync::{Semaphore, SemaphorePermit};
67use tokio::time::timeout;
68use tonic::{Request, Response, Result as TonicResult};
69
70use crate::error::{
71 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
72 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
73 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
74 HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
75 RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
76 StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
77};
78use crate::event_listener::RegionServerEventListenerRef;
79
80#[derive(Clone)]
81pub struct RegionServer {
82 inner: Arc<RegionServerInner>,
83}
84
85pub struct RegionStat {
86 pub region_id: RegionId,
87 pub engine: String,
88 pub role: RegionRole,
89}
90
91impl RegionServer {
92 pub fn new(
93 query_engine: QueryEngineRef,
94 runtime: Runtime,
95 event_listener: RegionServerEventListenerRef,
96 ) -> Self {
97 Self::with_table_provider(
98 query_engine,
99 runtime,
100 event_listener,
101 Arc::new(DummyTableProviderFactory),
102 0,
103 Duration::from_millis(0),
104 )
105 }
106
107 pub fn with_table_provider(
108 query_engine: QueryEngineRef,
109 runtime: Runtime,
110 event_listener: RegionServerEventListenerRef,
111 table_provider_factory: TableProviderFactoryRef,
112 max_concurrent_queries: usize,
113 concurrent_query_limiter_timeout: Duration,
114 ) -> Self {
115 Self {
116 inner: Arc::new(RegionServerInner::new(
117 query_engine,
118 runtime,
119 event_listener,
120 table_provider_factory,
121 RegionServerParallelism::from_opts(
122 max_concurrent_queries,
123 concurrent_query_limiter_timeout,
124 ),
125 )),
126 }
127 }
128
129 pub fn register_engine(&mut self, engine: RegionEngineRef) {
130 self.inner.register_engine(engine);
131 }
132
133 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
135 self.inner
136 .get_engine(region_id, &RegionChange::None)
137 .map(|x| match x {
138 CurrentEngine::Engine(engine) => Some(engine),
139 CurrentEngine::EarlyReturn(_) => None,
140 })
141 }
142
143 #[tracing::instrument(skip_all)]
144 pub async fn handle_batch_open_requests(
145 &self,
146 parallelism: usize,
147 requests: Vec<(RegionId, RegionOpenRequest)>,
148 ) -> Result<Vec<RegionId>> {
149 self.inner
150 .handle_batch_open_requests(parallelism, requests)
151 .await
152 }
153
154 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
155 pub async fn handle_request(
156 &self,
157 region_id: RegionId,
158 request: RegionRequest,
159 ) -> Result<RegionResponse> {
160 self.inner.handle_request(region_id, request).await
161 }
162
163 async fn table_provider(
165 &self,
166 region_id: RegionId,
167 ctx: Option<&session::context::QueryContext>,
168 ) -> Result<Arc<dyn TableProvider>> {
169 let status = self
170 .inner
171 .region_map
172 .get(®ion_id)
173 .context(RegionNotFoundSnafu { region_id })?
174 .clone();
175 ensure!(
176 matches!(status, RegionEngineWithStatus::Ready(_)),
177 RegionNotReadySnafu { region_id }
178 );
179
180 self.inner
181 .table_provider_factory
182 .create(region_id, status.into_engine(), ctx)
183 .await
184 .context(ExecuteLogicalPlanSnafu)
185 }
186
187 pub async fn handle_remote_read(
189 &self,
190 request: api::v1::region::QueryRequest,
191 ) -> Result<SendableRecordBatchStream> {
192 let _permit = if let Some(p) = &self.inner.parallelism {
193 Some(p.acquire().await?)
194 } else {
195 None
196 };
197
198 let query_ctx: QueryContextRef = request
199 .header
200 .as_ref()
201 .map(|h| Arc::new(h.into()))
202 .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
203
204 let region_id = RegionId::from_u64(request.region_id);
205 let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
206 let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
207
208 let decoder = self
209 .inner
210 .query_engine
211 .engine_context(query_ctx)
212 .new_plan_decoder()
213 .context(NewPlanDecoderSnafu)?;
214
215 let plan = decoder
216 .decode(Bytes::from(request.plan), catalog_list, false)
217 .await
218 .context(DecodeLogicalPlanSnafu)?;
219
220 self.inner
221 .handle_read(QueryRequest {
222 header: request.header,
223 region_id,
224 plan,
225 })
226 .await
227 }
228
229 #[tracing::instrument(skip_all)]
230 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
231 let _permit = if let Some(p) = &self.inner.parallelism {
232 Some(p.acquire().await?)
233 } else {
234 None
235 };
236
237 let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
238
239 let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
240
241 struct RegionDataSourceInjector {
242 source: Arc<dyn TableSource>,
243 }
244
245 impl TreeNodeRewriter for RegionDataSourceInjector {
246 type Node = LogicalPlan;
247
248 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
249 Ok(match node {
250 LogicalPlan::TableScan(mut scan) => {
251 scan.source = self.source.clone();
252 Transformed::yes(LogicalPlan::TableScan(scan))
253 }
254 _ => Transformed::no(node),
255 })
256 }
257 }
258
259 let plan = request
260 .plan
261 .rewrite(&mut RegionDataSourceInjector {
262 source: provider_as_source(provider),
263 })
264 .context(DataFusionSnafu)?
265 .data;
266
267 self.inner
268 .handle_read(QueryRequest { plan, ..request })
269 .await
270 }
271
272 pub fn reportable_regions(&self) -> Vec<RegionStat> {
276 self.inner
277 .region_map
278 .iter()
279 .filter_map(|e| {
280 let region_id = *e.key();
281 e.role(region_id).map(|role| RegionStat {
283 region_id,
284 engine: e.value().name().to_string(),
285 role,
286 })
287 })
288 .collect()
289 }
290
291 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
292 self.inner.region_map.get(®ion_id).and_then(|engine| {
293 engine.role(region_id).map(|role| match role {
294 RegionRole::Follower => false,
295 RegionRole::Leader => true,
296 RegionRole::DowngradingLeader => true,
297 })
298 })
299 }
300
301 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
302 let engine = self
303 .inner
304 .region_map
305 .get(®ion_id)
306 .with_context(|| RegionNotFoundSnafu { region_id })?;
307 engine
308 .set_region_role(region_id, role)
309 .with_context(|_| HandleRegionRequestSnafu { region_id })
310 }
311
312 pub async fn set_region_role_state_gracefully(
322 &self,
323 region_id: RegionId,
324 state: SettableRegionRoleState,
325 ) -> Result<SetRegionRoleStateResponse> {
326 match self.inner.region_map.get(®ion_id) {
327 Some(engine) => Ok(engine
328 .set_region_role_state_gracefully(region_id, state)
329 .await
330 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
331 None => Ok(SetRegionRoleStateResponse::NotFound),
332 }
333 }
334
335 pub fn runtime(&self) -> Runtime {
336 self.inner.runtime.clone()
337 }
338
339 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
340 match self.inner.region_map.get(®ion_id) {
341 Some(e) => e.region_statistic(region_id),
342 None => None,
343 }
344 }
345
346 pub async fn stop(&self) -> Result<()> {
348 self.inner.stop().await
349 }
350
351 #[cfg(test)]
352 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
354 self.inner
355 .region_map
356 .insert(region_id, RegionEngineWithStatus::Ready(engine));
357 }
358
359 async fn handle_batch_ddl_requests(
360 &self,
361 request: region_request::Body,
362 ) -> Result<RegionResponse> {
363 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
365 .context(BuildRegionRequestsSnafu)?
366 .unwrap();
367 let tracing_context = TracingContext::from_current_span();
368
369 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
370 self.inner
371 .handle_batch_request(batch_request)
372 .trace(span)
373 .await
374 }
375
376 async fn handle_requests_in_parallel(
377 &self,
378 request: region_request::Body,
379 ) -> Result<RegionResponse> {
380 let requests =
381 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
382 let tracing_context = TracingContext::from_current_span();
383
384 let join_tasks = requests.into_iter().map(|(region_id, req)| {
385 let self_to_move = self;
386 let span = tracing_context.attach(info_span!(
387 "RegionServer::handle_region_request",
388 region_id = region_id.to_string()
389 ));
390 async move {
391 self_to_move
392 .handle_request(region_id, req)
393 .trace(span)
394 .await
395 }
396 });
397
398 let results = try_join_all(join_tasks).await?;
399 let mut affected_rows = 0;
400 let mut extensions = HashMap::new();
401 for result in results {
402 affected_rows += result.affected_rows;
403 extensions.extend(result.extensions);
404 }
405
406 Ok(RegionResponse {
407 affected_rows,
408 extensions,
409 })
410 }
411
412 async fn handle_requests_in_serial(
413 &self,
414 request: region_request::Body,
415 ) -> Result<RegionResponse> {
416 let requests =
417 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
418 let tracing_context = TracingContext::from_current_span();
419
420 let mut affected_rows = 0;
421 let mut extensions = HashMap::new();
422 for (region_id, req) in requests {
425 let span = tracing_context.attach(info_span!(
426 "RegionServer::handle_region_request",
427 region_id = region_id.to_string()
428 ));
429 let result = self.handle_request(region_id, req).trace(span).await?;
430
431 affected_rows += result.affected_rows;
432 extensions.extend(result.extensions);
433 }
434
435 Ok(RegionResponse {
436 affected_rows,
437 extensions,
438 })
439 }
440
441 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
442 let region_id = RegionId::from_u64(request.region_id);
443 let manifest_info = request
444 .manifest_info
445 .context(error::MissingRequiredFieldSnafu {
446 name: "manifest_info",
447 })?;
448
449 let manifest_info = match manifest_info {
450 ManifestInfo::MitoManifestInfo(info) => {
451 RegionManifestInfo::mito(info.data_manifest_version, 0)
452 }
453 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
454 info.data_manifest_version,
455 0,
456 info.metadata_manifest_version,
457 0,
458 ),
459 };
460
461 let tracing_context = TracingContext::from_current_span();
462 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
463
464 self.sync_region(region_id, manifest_info)
465 .trace(span)
466 .await
467 .map(|_| RegionResponse::new(AffectedRows::default()))
468 }
469
470 pub async fn sync_region(
472 &self,
473 region_id: RegionId,
474 manifest_info: RegionManifestInfo,
475 ) -> Result<()> {
476 let engine_with_status = self
477 .inner
478 .region_map
479 .get(®ion_id)
480 .with_context(|| RegionNotFoundSnafu { region_id })?;
481
482 self.inner
483 .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
484 .await
485 }
486}
487
488#[async_trait]
489impl RegionServerHandler for RegionServer {
490 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
491 let response = match &request {
492 region_request::Body::Creates(_)
493 | region_request::Body::Drops(_)
494 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
495 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
496 self.handle_requests_in_parallel(request).await
497 }
498 region_request::Body::Sync(sync_request) => {
499 self.handle_sync_region_request(sync_request).await
500 }
501 _ => self.handle_requests_in_serial(request).await,
502 }
503 .map_err(BoxedError::new)
504 .context(ExecuteGrpcRequestSnafu)?;
505
506 Ok(RegionResponseV1 {
507 header: Some(ResponseHeader {
508 status: Some(Status {
509 status_code: StatusCode::Success as _,
510 ..Default::default()
511 }),
512 }),
513 affected_rows: response.affected_rows as _,
514 extensions: response.extensions,
515 })
516 }
517}
518
519#[async_trait]
520impl FlightCraft for RegionServer {
521 async fn do_get(
522 &self,
523 request: Request<Ticket>,
524 ) -> TonicResult<Response<TonicStream<FlightData>>> {
525 let ticket = request.into_inner().ticket;
526 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
527 .context(servers_error::InvalidFlightTicketSnafu)?;
528 let tracing_context = request
529 .header
530 .as_ref()
531 .map(|h| TracingContext::from_w3c(&h.tracing_context))
532 .unwrap_or_default();
533
534 let result = self
535 .handle_remote_read(request)
536 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
537 .await?;
538
539 let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
540 Ok(Response::new(stream))
541 }
542}
543
544#[derive(Clone)]
545enum RegionEngineWithStatus {
546 Registering(RegionEngineRef),
548 Deregistering(RegionEngineRef),
550 Ready(RegionEngineRef),
552}
553
554impl RegionEngineWithStatus {
555 pub fn into_engine(self) -> RegionEngineRef {
557 match self {
558 RegionEngineWithStatus::Registering(engine) => engine,
559 RegionEngineWithStatus::Deregistering(engine) => engine,
560 RegionEngineWithStatus::Ready(engine) => engine,
561 }
562 }
563
564 pub fn engine(&self) -> &RegionEngineRef {
566 match self {
567 RegionEngineWithStatus::Registering(engine) => engine,
568 RegionEngineWithStatus::Deregistering(engine) => engine,
569 RegionEngineWithStatus::Ready(engine) => engine,
570 }
571 }
572}
573
574impl Deref for RegionEngineWithStatus {
575 type Target = RegionEngineRef;
576
577 fn deref(&self) -> &Self::Target {
578 match self {
579 RegionEngineWithStatus::Registering(engine) => engine,
580 RegionEngineWithStatus::Deregistering(engine) => engine,
581 RegionEngineWithStatus::Ready(engine) => engine,
582 }
583 }
584}
585
586struct RegionServerInner {
587 engines: RwLock<HashMap<String, RegionEngineRef>>,
588 region_map: DashMap<RegionId, RegionEngineWithStatus>,
589 query_engine: QueryEngineRef,
590 runtime: Runtime,
591 event_listener: RegionServerEventListenerRef,
592 table_provider_factory: TableProviderFactoryRef,
593 parallelism: Option<RegionServerParallelism>,
596}
597
598struct RegionServerParallelism {
599 semaphore: Semaphore,
600 timeout: Duration,
601}
602
603impl RegionServerParallelism {
604 pub fn from_opts(
605 max_concurrent_queries: usize,
606 concurrent_query_limiter_timeout: Duration,
607 ) -> Option<Self> {
608 if max_concurrent_queries == 0 {
609 return None;
610 }
611 Some(RegionServerParallelism {
612 semaphore: Semaphore::new(max_concurrent_queries),
613 timeout: concurrent_query_limiter_timeout,
614 })
615 }
616
617 pub async fn acquire(&self) -> Result<SemaphorePermit> {
618 timeout(self.timeout, self.semaphore.acquire())
619 .await
620 .context(ConcurrentQueryLimiterTimeoutSnafu)?
621 .context(ConcurrentQueryLimiterClosedSnafu)
622 }
623}
624
625enum CurrentEngine {
626 Engine(RegionEngineRef),
627 EarlyReturn(AffectedRows),
628}
629
630impl Debug for CurrentEngine {
631 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 match self {
633 CurrentEngine::Engine(engine) => f
634 .debug_struct("CurrentEngine")
635 .field("engine", &engine.name())
636 .finish(),
637 CurrentEngine::EarlyReturn(rows) => f
638 .debug_struct("CurrentEngine")
639 .field("return", rows)
640 .finish(),
641 }
642 }
643}
644
645impl RegionServerInner {
646 pub fn new(
647 query_engine: QueryEngineRef,
648 runtime: Runtime,
649 event_listener: RegionServerEventListenerRef,
650 table_provider_factory: TableProviderFactoryRef,
651 parallelism: Option<RegionServerParallelism>,
652 ) -> Self {
653 Self {
654 engines: RwLock::new(HashMap::new()),
655 region_map: DashMap::new(),
656 query_engine,
657 runtime,
658 event_listener,
659 table_provider_factory,
660 parallelism,
661 }
662 }
663
664 pub fn register_engine(&self, engine: RegionEngineRef) {
665 let engine_name = engine.name();
666 info!("Region Engine {engine_name} is registered");
667 self.engines
668 .write()
669 .unwrap()
670 .insert(engine_name.to_string(), engine);
671 }
672
673 fn get_engine(
674 &self,
675 region_id: RegionId,
676 region_change: &RegionChange,
677 ) -> Result<CurrentEngine> {
678 let current_region_status = self.region_map.get(®ion_id);
679
680 let engine = match region_change {
681 RegionChange::Register(attribute) => match current_region_status {
682 Some(status) => match status.clone() {
683 RegionEngineWithStatus::Registering(engine) => engine,
684 RegionEngineWithStatus::Deregistering(_) => {
685 return error::RegionBusySnafu { region_id }.fail()
686 }
687 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
688 },
689 _ => self
690 .engines
691 .read()
692 .unwrap()
693 .get(attribute.engine())
694 .with_context(|| RegionEngineNotFoundSnafu {
695 name: attribute.engine(),
696 })?
697 .clone(),
698 },
699 RegionChange::Deregisters => match current_region_status {
700 Some(status) => match status.clone() {
701 RegionEngineWithStatus::Registering(_) => {
702 return error::RegionBusySnafu { region_id }.fail()
703 }
704 RegionEngineWithStatus::Deregistering(_) => {
705 return Ok(CurrentEngine::EarlyReturn(0))
706 }
707 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
708 },
709 None => return Ok(CurrentEngine::EarlyReturn(0)),
710 },
711 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
712 match current_region_status {
713 Some(status) => match status.clone() {
714 RegionEngineWithStatus::Registering(_) => {
715 return error::RegionNotReadySnafu { region_id }.fail()
716 }
717 RegionEngineWithStatus::Deregistering(_) => {
718 return error::RegionNotFoundSnafu { region_id }.fail()
719 }
720 RegionEngineWithStatus::Ready(engine) => engine,
721 },
722 None => return error::RegionNotFoundSnafu { region_id }.fail(),
723 }
724 }
725 };
726
727 Ok(CurrentEngine::Engine(engine))
728 }
729
730 async fn handle_batch_open_requests_inner(
731 &self,
732 engine: RegionEngineRef,
733 parallelism: usize,
734 requests: Vec<(RegionId, RegionOpenRequest)>,
735 ) -> Result<Vec<RegionId>> {
736 let region_changes = requests
737 .iter()
738 .map(|(region_id, open)| {
739 let attribute = parse_region_attribute(&open.engine, &open.options)?;
740 Ok((*region_id, RegionChange::Register(attribute)))
741 })
742 .collect::<Result<HashMap<_, _>>>()?;
743
744 for (®ion_id, region_change) in ®ion_changes {
745 self.set_region_status_not_ready(region_id, &engine, region_change)
746 }
747
748 let mut open_regions = Vec::with_capacity(requests.len());
749 let mut errors = vec![];
750 match engine
751 .handle_batch_open_requests(parallelism, requests)
752 .await
753 .with_context(|_| HandleBatchOpenRequestSnafu)
754 {
755 Ok(results) => {
756 for (region_id, result) in results {
757 let region_change = ®ion_changes[®ion_id];
758 match result {
759 Ok(_) => {
760 if let Err(e) = self
761 .set_region_status_ready(region_id, engine.clone(), *region_change)
762 .await
763 {
764 error!(e; "Failed to set region to ready: {}", region_id);
765 errors.push(BoxedError::new(e));
766 } else {
767 open_regions.push(region_id)
768 }
769 }
770 Err(e) => {
771 self.unset_region_status(region_id, &engine, *region_change);
772 error!(e; "Failed to open region: {}", region_id);
773 errors.push(e);
774 }
775 }
776 }
777 }
778 Err(e) => {
779 for (®ion_id, region_change) in ®ion_changes {
780 self.unset_region_status(region_id, &engine, *region_change);
781 }
782 error!(e; "Failed to open batch regions");
783 errors.push(BoxedError::new(e));
784 }
785 }
786
787 if !errors.is_empty() {
788 return error::UnexpectedSnafu {
789 violated: format!("Failed to open batch regions: {:?}", errors[0]),
791 }
792 .fail();
793 }
794
795 Ok(open_regions)
796 }
797
798 pub async fn handle_batch_open_requests(
799 &self,
800 parallelism: usize,
801 requests: Vec<(RegionId, RegionOpenRequest)>,
802 ) -> Result<Vec<RegionId>> {
803 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
804 HashMap::with_capacity(requests.len());
805 for (region_id, request) in requests {
806 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
807 requests.push((region_id, request));
808 } else {
809 engine_grouped_requests
810 .insert(request.engine.to_string(), vec![(region_id, request)]);
811 }
812 }
813
814 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
815 for (engine, requests) in engine_grouped_requests {
816 let engine = self
817 .engines
818 .read()
819 .unwrap()
820 .get(&engine)
821 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
822 .clone();
823 results.push(
824 self.handle_batch_open_requests_inner(engine, parallelism, requests)
825 .await,
826 )
827 }
828
829 Ok(results
830 .into_iter()
831 .collect::<Result<Vec<_>>>()?
832 .into_iter()
833 .flatten()
834 .collect::<Vec<_>>())
835 }
836
837 pub async fn handle_batch_request(
841 &self,
842 batch_request: BatchRegionDdlRequest,
843 ) -> Result<RegionResponse> {
844 let region_changes = match &batch_request {
845 BatchRegionDdlRequest::Create(requests) => requests
846 .iter()
847 .map(|(region_id, create)| {
848 let attribute = parse_region_attribute(&create.engine, &create.options)?;
849 Ok((*region_id, RegionChange::Register(attribute)))
850 })
851 .collect::<Result<Vec<_>>>()?,
852 BatchRegionDdlRequest::Drop(requests) => requests
853 .iter()
854 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
855 .collect::<Vec<_>>(),
856 BatchRegionDdlRequest::Alter(requests) => requests
857 .iter()
858 .map(|(region_id, _)| (*region_id, RegionChange::None))
859 .collect::<Vec<_>>(),
860 };
861
862 let (first_region_id, first_region_change) = region_changes.first().unwrap();
865 let engine = match self.get_engine(*first_region_id, first_region_change)? {
866 CurrentEngine::Engine(engine) => engine,
867 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
868 };
869
870 for (region_id, region_change) in region_changes.iter() {
871 self.set_region_status_not_ready(*region_id, &engine, region_change);
872 }
873
874 let ddl_type = batch_request.request_type();
875 let result = engine
876 .handle_batch_ddl_requests(batch_request)
877 .await
878 .context(HandleBatchDdlRequestSnafu { ddl_type });
879
880 match result {
881 Ok(result) => {
882 for (region_id, region_change) in ®ion_changes {
883 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
884 .await?;
885 }
886
887 Ok(RegionResponse {
888 affected_rows: result.affected_rows,
889 extensions: result.extensions,
890 })
891 }
892 Err(err) => {
893 for (region_id, region_change) in region_changes {
894 self.unset_region_status(region_id, &engine, region_change);
895 }
896
897 Err(err)
898 }
899 }
900 }
901
902 pub async fn handle_request(
903 &self,
904 region_id: RegionId,
905 request: RegionRequest,
906 ) -> Result<RegionResponse> {
907 let request_type = request.request_type();
908 let region_id_str = region_id.to_string();
909 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
910 .with_label_values(&[®ion_id_str, request_type])
911 .start_timer();
912
913 let region_change = match &request {
914 RegionRequest::Create(create) => {
915 let attribute = parse_region_attribute(&create.engine, &create.options)?;
916 RegionChange::Register(attribute)
917 }
918 RegionRequest::Open(open) => {
919 let attribute = parse_region_attribute(&open.engine, &open.options)?;
920 RegionChange::Register(attribute)
921 }
922 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
923 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
924 RegionChange::Ingest
925 }
926 RegionRequest::Alter(_)
927 | RegionRequest::Flush(_)
928 | RegionRequest::Compact(_)
929 | RegionRequest::Truncate(_) => RegionChange::None,
930 RegionRequest::Catchup(_) => RegionChange::Catchup,
931 };
932
933 let engine = match self.get_engine(region_id, ®ion_change)? {
934 CurrentEngine::Engine(engine) => engine,
935 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
936 };
937
938 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
940
941 match engine
942 .handle_request(region_id, request)
943 .await
944 .with_context(|_| HandleRegionRequestSnafu { region_id })
945 {
946 Ok(result) => {
947 if matches!(region_change, RegionChange::Ingest) {
949 crate::metrics::REGION_CHANGED_ROW_COUNT
950 .with_label_values(&[®ion_id_str, request_type])
951 .inc_by(result.affected_rows as u64);
952 }
953 self.set_region_status_ready(region_id, engine.clone(), region_change)
955 .await?;
956
957 Ok(RegionResponse {
958 affected_rows: result.affected_rows,
959 extensions: result.extensions,
960 })
961 }
962 Err(err) => {
963 self.unset_region_status(region_id, &engine, region_change);
965 Err(err)
966 }
967 }
968 }
969
970 pub async fn handle_sync_region(
972 &self,
973 engine: &RegionEngineRef,
974 region_id: RegionId,
975 manifest_info: RegionManifestInfo,
976 ) -> Result<()> {
977 let Some(new_opened_regions) = engine
978 .sync_region(region_id, manifest_info)
979 .await
980 .with_context(|_| HandleRegionRequestSnafu { region_id })?
981 .new_opened_logical_region_ids()
982 else {
983 warn!("No new opened logical regions");
984 return Ok(());
985 };
986
987 for region in new_opened_regions {
988 self.region_map
989 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
990 info!("Logical region {} is registered!", region);
991 }
992
993 Ok(())
994 }
995
996 fn set_region_status_not_ready(
997 &self,
998 region_id: RegionId,
999 engine: &RegionEngineRef,
1000 region_change: &RegionChange,
1001 ) {
1002 match region_change {
1003 RegionChange::Register(_) => {
1004 self.region_map.insert(
1005 region_id,
1006 RegionEngineWithStatus::Registering(engine.clone()),
1007 );
1008 }
1009 RegionChange::Deregisters => {
1010 self.region_map.insert(
1011 region_id,
1012 RegionEngineWithStatus::Deregistering(engine.clone()),
1013 );
1014 }
1015 _ => {}
1016 }
1017 }
1018
1019 fn unset_region_status(
1020 &self,
1021 region_id: RegionId,
1022 engine: &RegionEngineRef,
1023 region_change: RegionChange,
1024 ) {
1025 match region_change {
1026 RegionChange::None | RegionChange::Ingest => {}
1027 RegionChange::Register(_) => {
1028 self.region_map.remove(®ion_id);
1029 }
1030 RegionChange::Deregisters => {
1031 self.region_map
1032 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1033 }
1034 RegionChange::Catchup => {}
1035 }
1036 }
1037
1038 async fn set_region_status_ready(
1039 &self,
1040 region_id: RegionId,
1041 engine: RegionEngineRef,
1042 region_change: RegionChange,
1043 ) -> Result<()> {
1044 let engine_type = engine.name();
1045 match region_change {
1046 RegionChange::None | RegionChange::Ingest => {}
1047 RegionChange::Register(attribute) => {
1048 info!(
1049 "Region {region_id} is registered to engine {}",
1050 attribute.engine()
1051 );
1052 self.region_map
1053 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1054
1055 match attribute {
1056 RegionAttribute::Metric { physical } => {
1057 if physical {
1058 self.register_logical_regions(&engine, region_id).await?;
1060 self.event_listener.on_region_registered(region_id);
1062 }
1063 }
1064 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1065 RegionAttribute::File => {
1066 }
1068 }
1069 }
1070 RegionChange::Deregisters => {
1071 info!("Region {region_id} is deregistered from engine {engine_type}");
1072 self.region_map
1073 .remove(®ion_id)
1074 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1075 self.event_listener.on_region_deregistered(region_id);
1076 }
1077 RegionChange::Catchup => {
1078 if is_metric_engine(engine.name()) {
1079 self.register_logical_regions(&engine, region_id).await?;
1081 }
1082 }
1083 }
1084 Ok(())
1085 }
1086
1087 async fn register_logical_regions(
1088 &self,
1089 engine: &RegionEngineRef,
1090 physical_region_id: RegionId,
1091 ) -> Result<()> {
1092 let metric_engine =
1093 engine
1094 .as_any()
1095 .downcast_ref::<MetricEngine>()
1096 .context(UnexpectedSnafu {
1097 violated: format!(
1098 "expecting engine type '{}', actual '{}'",
1099 METRIC_ENGINE_NAME,
1100 engine.name(),
1101 ),
1102 })?;
1103
1104 let logical_regions = metric_engine
1105 .logical_regions(physical_region_id)
1106 .await
1107 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1108
1109 for region in logical_regions {
1110 self.region_map
1111 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1112 info!("Logical region {} is registered!", region);
1113 }
1114 Ok(())
1115 }
1116
1117 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
1118 let query_ctx: QueryContextRef = request
1122 .header
1123 .as_ref()
1124 .map(|h| Arc::new(h.into()))
1125 .unwrap_or_else(|| QueryContextBuilder::default().build().into());
1126
1127 let result = self
1128 .query_engine
1129 .execute(request.plan, query_ctx)
1130 .await
1131 .context(ExecuteLogicalPlanSnafu)?;
1132
1133 match result.data {
1134 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1135 UnsupportedOutputSnafu { expected: "stream" }.fail()
1136 }
1137 OutputData::Stream(stream) => Ok(stream),
1138 }
1139 }
1140
1141 async fn stop(&self) -> Result<()> {
1142 let regions = self
1150 .region_map
1151 .iter()
1152 .map(|x| (*x.key(), x.value().clone()))
1153 .collect::<Vec<_>>();
1154 let num_regions = regions.len();
1155
1156 for (region_id, engine) in regions {
1157 let closed = engine
1158 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1159 .await;
1160 match closed {
1161 Ok(_) => debug!("Region {region_id} is closed"),
1162 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1163 }
1164 }
1165 self.region_map.clear();
1166 info!("closed {num_regions} regions");
1167
1168 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1169 for (engine_name, engine) in engines {
1170 engine
1171 .stop()
1172 .await
1173 .context(StopRegionEngineSnafu { name: &engine_name })?;
1174 info!("Region engine {engine_name} is stopped");
1175 }
1176
1177 Ok(())
1178 }
1179}
1180
1181#[derive(Debug, Clone, Copy)]
1182enum RegionChange {
1183 None,
1184 Register(RegionAttribute),
1185 Deregisters,
1186 Catchup,
1187 Ingest,
1188}
1189
1190fn is_metric_engine(engine: &str) -> bool {
1191 engine == METRIC_ENGINE_NAME
1192}
1193
1194fn parse_region_attribute(
1195 engine: &str,
1196 options: &HashMap<String, String>,
1197) -> Result<RegionAttribute> {
1198 match engine {
1199 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1200 METRIC_ENGINE_NAME => {
1201 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1202
1203 Ok(RegionAttribute::Metric { physical })
1204 }
1205 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1206 _ => error::UnexpectedSnafu {
1207 violated: format!("Unknown engine: {}", engine),
1208 }
1209 .fail(),
1210 }
1211}
1212
1213#[derive(Debug, Clone, Copy)]
1214enum RegionAttribute {
1215 Mito,
1216 Metric { physical: bool },
1217 File,
1218}
1219
1220impl RegionAttribute {
1221 fn engine(&self) -> &'static str {
1222 match self {
1223 RegionAttribute::Mito => MITO_ENGINE_NAME,
1224 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1225 RegionAttribute::File => FILE_ENGINE_NAME,
1226 }
1227 }
1228}
1229
1230#[cfg(test)]
1231mod tests {
1232
1233 use std::assert_matches::assert_matches;
1234
1235 use common_error::ext::ErrorExt;
1236 use mito2::test_util::CreateRequestBuilder;
1237 use store_api::region_engine::RegionEngine;
1238 use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
1239 use store_api::storage::RegionId;
1240
1241 use super::*;
1242 use crate::error::Result;
1243 use crate::tests::{mock_region_server, MockRegionEngine};
1244
1245 #[tokio::test]
1246 async fn test_region_registering() {
1247 common_telemetry::init_default_ut_logging();
1248
1249 let mut mock_region_server = mock_region_server();
1250 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1251 let engine_name = engine.name();
1252 mock_region_server.register_engine(engine.clone());
1253 let region_id = RegionId::new(1, 1);
1254 let builder = CreateRequestBuilder::new();
1255 let create_req = builder.build();
1256 mock_region_server.inner.region_map.insert(
1258 region_id,
1259 RegionEngineWithStatus::Registering(engine.clone()),
1260 );
1261 let response = mock_region_server
1262 .handle_request(region_id, RegionRequest::Create(create_req))
1263 .await
1264 .unwrap();
1265 assert_eq!(response.affected_rows, 0);
1266 let status = mock_region_server
1267 .inner
1268 .region_map
1269 .get(®ion_id)
1270 .unwrap()
1271 .clone();
1272 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1273
1274 mock_region_server.inner.region_map.insert(
1275 region_id,
1276 RegionEngineWithStatus::Registering(engine.clone()),
1277 );
1278 let response = mock_region_server
1279 .handle_request(
1280 region_id,
1281 RegionRequest::Open(RegionOpenRequest {
1282 engine: engine_name.to_string(),
1283 region_dir: String::new(),
1284 options: Default::default(),
1285 skip_wal_replay: false,
1286 }),
1287 )
1288 .await
1289 .unwrap();
1290 assert_eq!(response.affected_rows, 0);
1291 let status = mock_region_server
1292 .inner
1293 .region_map
1294 .get(®ion_id)
1295 .unwrap()
1296 .clone();
1297 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1298 }
1299
1300 #[tokio::test]
1301 async fn test_region_deregistering() {
1302 common_telemetry::init_default_ut_logging();
1303
1304 let mut mock_region_server = mock_region_server();
1305 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1306
1307 mock_region_server.register_engine(engine.clone());
1308
1309 let region_id = RegionId::new(1, 1);
1310
1311 mock_region_server.inner.region_map.insert(
1313 region_id,
1314 RegionEngineWithStatus::Deregistering(engine.clone()),
1315 );
1316
1317 let response = mock_region_server
1318 .handle_request(
1319 region_id,
1320 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1321 )
1322 .await
1323 .unwrap();
1324 assert_eq!(response.affected_rows, 0);
1325
1326 let status = mock_region_server
1327 .inner
1328 .region_map
1329 .get(®ion_id)
1330 .unwrap()
1331 .clone();
1332 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1333
1334 mock_region_server.inner.region_map.insert(
1335 region_id,
1336 RegionEngineWithStatus::Deregistering(engine.clone()),
1337 );
1338
1339 let response = mock_region_server
1340 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1341 .await
1342 .unwrap();
1343 assert_eq!(response.affected_rows, 0);
1344
1345 let status = mock_region_server
1346 .inner
1347 .region_map
1348 .get(®ion_id)
1349 .unwrap()
1350 .clone();
1351 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1352 }
1353
1354 #[tokio::test]
1355 async fn test_region_not_ready() {
1356 common_telemetry::init_default_ut_logging();
1357
1358 let mut mock_region_server = mock_region_server();
1359 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1360
1361 mock_region_server.register_engine(engine.clone());
1362
1363 let region_id = RegionId::new(1, 1);
1364
1365 mock_region_server.inner.region_map.insert(
1367 region_id,
1368 RegionEngineWithStatus::Registering(engine.clone()),
1369 );
1370
1371 let err = mock_region_server
1372 .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
1373 .await
1374 .unwrap_err();
1375
1376 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1377 }
1378
1379 #[tokio::test]
1380 async fn test_region_request_failed() {
1381 common_telemetry::init_default_ut_logging();
1382
1383 let mut mock_region_server = mock_region_server();
1384 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1385 MITO_ENGINE_NAME,
1386 Box::new(|_region_id, _request| {
1387 error::UnexpectedSnafu {
1388 violated: "test".to_string(),
1389 }
1390 .fail()
1391 }),
1392 );
1393
1394 mock_region_server.register_engine(engine.clone());
1395
1396 let region_id = RegionId::new(1, 1);
1397 let builder = CreateRequestBuilder::new();
1398 let create_req = builder.build();
1399 mock_region_server
1400 .handle_request(region_id, RegionRequest::Create(create_req))
1401 .await
1402 .unwrap_err();
1403
1404 let status = mock_region_server.inner.region_map.get(®ion_id);
1405 assert!(status.is_none());
1406
1407 mock_region_server
1408 .inner
1409 .region_map
1410 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1411
1412 mock_region_server
1413 .handle_request(
1414 region_id,
1415 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1416 )
1417 .await
1418 .unwrap_err();
1419
1420 let status = mock_region_server.inner.region_map.get(®ion_id);
1421 assert!(status.is_some());
1422 }
1423
1424 struct CurrentEngineTest {
1425 region_id: RegionId,
1426 current_region_status: Option<RegionEngineWithStatus>,
1427 region_change: RegionChange,
1428 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1429 }
1430
1431 #[tokio::test]
1432 async fn test_current_engine() {
1433 common_telemetry::init_default_ut_logging();
1434
1435 let mut mock_region_server = mock_region_server();
1436 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1437 mock_region_server.register_engine(engine.clone());
1438
1439 let region_id = RegionId::new(1024, 1);
1440 let tests = vec![
1441 CurrentEngineTest {
1443 region_id,
1444 current_region_status: None,
1445 region_change: RegionChange::None,
1446 assert: Box::new(|result| {
1447 let err = result.unwrap_err();
1448 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1449 }),
1450 },
1451 CurrentEngineTest {
1452 region_id,
1453 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1454 region_change: RegionChange::None,
1455 assert: Box::new(|result| {
1456 let current_engine = result.unwrap();
1457 assert_matches!(current_engine, CurrentEngine::Engine(_));
1458 }),
1459 },
1460 CurrentEngineTest {
1461 region_id,
1462 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1463 region_change: RegionChange::None,
1464 assert: Box::new(|result| {
1465 let err = result.unwrap_err();
1466 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1467 }),
1468 },
1469 CurrentEngineTest {
1470 region_id,
1471 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1472 region_change: RegionChange::None,
1473 assert: Box::new(|result| {
1474 let err = result.unwrap_err();
1475 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1476 }),
1477 },
1478 CurrentEngineTest {
1480 region_id,
1481 current_region_status: None,
1482 region_change: RegionChange::Register(RegionAttribute::Mito),
1483 assert: Box::new(|result| {
1484 let current_engine = result.unwrap();
1485 assert_matches!(current_engine, CurrentEngine::Engine(_));
1486 }),
1487 },
1488 CurrentEngineTest {
1489 region_id,
1490 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1491 region_change: RegionChange::Register(RegionAttribute::Mito),
1492 assert: Box::new(|result| {
1493 let current_engine = result.unwrap();
1494 assert_matches!(current_engine, CurrentEngine::Engine(_));
1495 }),
1496 },
1497 CurrentEngineTest {
1498 region_id,
1499 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1500 region_change: RegionChange::Register(RegionAttribute::Mito),
1501 assert: Box::new(|result| {
1502 let err = result.unwrap_err();
1503 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1504 }),
1505 },
1506 CurrentEngineTest {
1507 region_id,
1508 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1509 region_change: RegionChange::Register(RegionAttribute::Mito),
1510 assert: Box::new(|result| {
1511 let current_engine = result.unwrap();
1512 assert_matches!(current_engine, CurrentEngine::Engine(_));
1513 }),
1514 },
1515 CurrentEngineTest {
1517 region_id,
1518 current_region_status: None,
1519 region_change: RegionChange::Deregisters,
1520 assert: Box::new(|result| {
1521 let current_engine = result.unwrap();
1522 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1523 }),
1524 },
1525 CurrentEngineTest {
1526 region_id,
1527 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1528 region_change: RegionChange::Deregisters,
1529 assert: Box::new(|result| {
1530 let err = result.unwrap_err();
1531 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1532 }),
1533 },
1534 CurrentEngineTest {
1535 region_id,
1536 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1537 region_change: RegionChange::Deregisters,
1538 assert: Box::new(|result| {
1539 let current_engine = result.unwrap();
1540 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1541 }),
1542 },
1543 CurrentEngineTest {
1544 region_id,
1545 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1546 region_change: RegionChange::Deregisters,
1547 assert: Box::new(|result| {
1548 let current_engine = result.unwrap();
1549 assert_matches!(current_engine, CurrentEngine::Engine(_));
1550 }),
1551 },
1552 ];
1553
1554 for test in tests {
1555 let CurrentEngineTest {
1556 region_id,
1557 current_region_status,
1558 region_change,
1559 assert,
1560 } = test;
1561
1562 if let Some(status) = current_region_status {
1564 mock_region_server
1565 .inner
1566 .region_map
1567 .insert(region_id, status);
1568 } else {
1569 mock_region_server.inner.region_map.remove(®ion_id);
1570 }
1571
1572 let result = mock_region_server
1573 .inner
1574 .get_engine(region_id, ®ion_change);
1575
1576 assert(result);
1577 }
1578 }
1579
1580 #[tokio::test]
1581 async fn test_region_server_parallelism() {
1582 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1583 let first_query = p.acquire().await;
1584 assert!(first_query.is_ok());
1585 let second_query = p.acquire().await;
1586 assert!(second_query.is_ok());
1587 let third_query = p.acquire().await;
1588 assert!(third_query.is_err());
1589 let err = third_query.unwrap_err();
1590 assert_eq!(
1591 err.output_msg(),
1592 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1593 );
1594 drop(first_query);
1595 let forth_query = p.acquire().await;
1596 assert!(forth_query.is_ok());
1597 }
1598}