datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use api::region::RegionResponse;
22use api::v1::region::sync_request::ManifestInfo;
23use api::v1::region::{region_request, RegionResponse as RegionResponseV1, SyncRequest};
24use api::v1::{ResponseHeader, Status};
25use arrow_flight::{FlightData, Ticket};
26use async_trait::async_trait;
27use bytes::Bytes;
28use common_error::ext::BoxedError;
29use common_error::status_code::StatusCode;
30use common_query::request::QueryRequest;
31use common_query::OutputData;
32use common_recordbatch::SendableRecordBatchStream;
33use common_runtime::Runtime;
34use common_telemetry::tracing::{self, info_span};
35use common_telemetry::tracing_context::{FutureExt, TracingContext};
36use common_telemetry::{debug, error, info, warn};
37use dashmap::DashMap;
38use datafusion::datasource::{provider_as_source, TableProvider};
39use datafusion::error::Result as DfResult;
40use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
41use datafusion_expr::{LogicalPlan, TableSource};
42use futures_util::future::try_join_all;
43use metric_engine::engine::MetricEngine;
44use mito2::engine::MITO_ENGINE_NAME;
45use prost::Message;
46pub use query::dummy_catalog::{
47    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
48};
49use query::QueryEngineRef;
50use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
51use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
52use servers::grpc::region_server::RegionServerHandler;
53use session::context::{QueryContextBuilder, QueryContextRef};
54use snafu::{ensure, OptionExt, ResultExt};
55use store_api::metric_engine_consts::{
56    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
57};
58use store_api::region_engine::{
59    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
60    SettableRegionRoleState,
61};
62use store_api::region_request::{
63    AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
64};
65use store_api::storage::RegionId;
66use tokio::sync::{Semaphore, SemaphorePermit};
67use tokio::time::timeout;
68use tonic::{Request, Response, Result as TonicResult};
69
70use crate::error::{
71    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
72    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
73    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
74    HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
75    RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
76    StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
77};
78use crate::event_listener::RegionServerEventListenerRef;
79
80#[derive(Clone)]
81pub struct RegionServer {
82    inner: Arc<RegionServerInner>,
83}
84
85pub struct RegionStat {
86    pub region_id: RegionId,
87    pub engine: String,
88    pub role: RegionRole,
89}
90
91impl RegionServer {
92    pub fn new(
93        query_engine: QueryEngineRef,
94        runtime: Runtime,
95        event_listener: RegionServerEventListenerRef,
96    ) -> Self {
97        Self::with_table_provider(
98            query_engine,
99            runtime,
100            event_listener,
101            Arc::new(DummyTableProviderFactory),
102            0,
103            Duration::from_millis(0),
104        )
105    }
106
107    pub fn with_table_provider(
108        query_engine: QueryEngineRef,
109        runtime: Runtime,
110        event_listener: RegionServerEventListenerRef,
111        table_provider_factory: TableProviderFactoryRef,
112        max_concurrent_queries: usize,
113        concurrent_query_limiter_timeout: Duration,
114    ) -> Self {
115        Self {
116            inner: Arc::new(RegionServerInner::new(
117                query_engine,
118                runtime,
119                event_listener,
120                table_provider_factory,
121                RegionServerParallelism::from_opts(
122                    max_concurrent_queries,
123                    concurrent_query_limiter_timeout,
124                ),
125            )),
126        }
127    }
128
129    pub fn register_engine(&mut self, engine: RegionEngineRef) {
130        self.inner.register_engine(engine);
131    }
132
133    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
134    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
135        self.inner
136            .get_engine(region_id, &RegionChange::None)
137            .map(|x| match x {
138                CurrentEngine::Engine(engine) => Some(engine),
139                CurrentEngine::EarlyReturn(_) => None,
140            })
141    }
142
143    #[tracing::instrument(skip_all)]
144    pub async fn handle_batch_open_requests(
145        &self,
146        parallelism: usize,
147        requests: Vec<(RegionId, RegionOpenRequest)>,
148    ) -> Result<Vec<RegionId>> {
149        self.inner
150            .handle_batch_open_requests(parallelism, requests)
151            .await
152    }
153
154    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
155    pub async fn handle_request(
156        &self,
157        region_id: RegionId,
158        request: RegionRequest,
159    ) -> Result<RegionResponse> {
160        self.inner.handle_request(region_id, request).await
161    }
162
163    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
164    async fn table_provider(
165        &self,
166        region_id: RegionId,
167        ctx: Option<&session::context::QueryContext>,
168    ) -> Result<Arc<dyn TableProvider>> {
169        let status = self
170            .inner
171            .region_map
172            .get(&region_id)
173            .context(RegionNotFoundSnafu { region_id })?
174            .clone();
175        ensure!(
176            matches!(status, RegionEngineWithStatus::Ready(_)),
177            RegionNotReadySnafu { region_id }
178        );
179
180        self.inner
181            .table_provider_factory
182            .create(region_id, status.into_engine(), ctx)
183            .await
184            .context(ExecuteLogicalPlanSnafu)
185    }
186
187    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
188    pub async fn handle_remote_read(
189        &self,
190        request: api::v1::region::QueryRequest,
191    ) -> Result<SendableRecordBatchStream> {
192        let _permit = if let Some(p) = &self.inner.parallelism {
193            Some(p.acquire().await?)
194        } else {
195            None
196        };
197
198        let query_ctx: QueryContextRef = request
199            .header
200            .as_ref()
201            .map(|h| Arc::new(h.into()))
202            .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
203
204        let region_id = RegionId::from_u64(request.region_id);
205        let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
206        let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
207
208        let decoder = self
209            .inner
210            .query_engine
211            .engine_context(query_ctx)
212            .new_plan_decoder()
213            .context(NewPlanDecoderSnafu)?;
214
215        let plan = decoder
216            .decode(Bytes::from(request.plan), catalog_list, false)
217            .await
218            .context(DecodeLogicalPlanSnafu)?;
219
220        self.inner
221            .handle_read(QueryRequest {
222                header: request.header,
223                region_id,
224                plan,
225            })
226            .await
227    }
228
229    #[tracing::instrument(skip_all)]
230    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
231        let _permit = if let Some(p) = &self.inner.parallelism {
232            Some(p.acquire().await?)
233        } else {
234            None
235        };
236
237        let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
238
239        let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
240
241        struct RegionDataSourceInjector {
242            source: Arc<dyn TableSource>,
243        }
244
245        impl TreeNodeRewriter for RegionDataSourceInjector {
246            type Node = LogicalPlan;
247
248            fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
249                Ok(match node {
250                    LogicalPlan::TableScan(mut scan) => {
251                        scan.source = self.source.clone();
252                        Transformed::yes(LogicalPlan::TableScan(scan))
253                    }
254                    _ => Transformed::no(node),
255                })
256            }
257        }
258
259        let plan = request
260            .plan
261            .rewrite(&mut RegionDataSourceInjector {
262                source: provider_as_source(provider),
263            })
264            .context(DataFusionSnafu)?
265            .data;
266
267        self.inner
268            .handle_read(QueryRequest { plan, ..request })
269            .await
270    }
271
272    /// Returns all opened and reportable regions.
273    ///
274    /// Notes: except all metrics regions.
275    pub fn reportable_regions(&self) -> Vec<RegionStat> {
276        self.inner
277            .region_map
278            .iter()
279            .filter_map(|e| {
280                let region_id = *e.key();
281                // Filters out any regions whose role equals None.
282                e.role(region_id).map(|role| RegionStat {
283                    region_id,
284                    engine: e.value().name().to_string(),
285                    role,
286                })
287            })
288            .collect()
289    }
290
291    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
292        self.inner.region_map.get(&region_id).and_then(|engine| {
293            engine.role(region_id).map(|role| match role {
294                RegionRole::Follower => false,
295                RegionRole::Leader => true,
296                RegionRole::DowngradingLeader => true,
297            })
298        })
299    }
300
301    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
302        let engine = self
303            .inner
304            .region_map
305            .get(&region_id)
306            .with_context(|| RegionNotFoundSnafu { region_id })?;
307        engine
308            .set_region_role(region_id, role)
309            .with_context(|_| HandleRegionRequestSnafu { region_id })
310    }
311
312    /// Set region role state gracefully.
313    ///
314    /// For [SettableRegionRoleState::Follower]:
315    /// After the call returns, the engine ensures that
316    /// no **further** write or flush operations will succeed in this region.
317    ///
318    /// For [SettableRegionRoleState::DowngradingLeader]:
319    /// After the call returns, the engine ensures that
320    /// no **further** write operations will succeed in this region.
321    pub async fn set_region_role_state_gracefully(
322        &self,
323        region_id: RegionId,
324        state: SettableRegionRoleState,
325    ) -> Result<SetRegionRoleStateResponse> {
326        match self.inner.region_map.get(&region_id) {
327            Some(engine) => Ok(engine
328                .set_region_role_state_gracefully(region_id, state)
329                .await
330                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
331            None => Ok(SetRegionRoleStateResponse::NotFound),
332        }
333    }
334
335    pub fn runtime(&self) -> Runtime {
336        self.inner.runtime.clone()
337    }
338
339    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
340        match self.inner.region_map.get(&region_id) {
341            Some(e) => e.region_statistic(region_id),
342            None => None,
343        }
344    }
345
346    /// Stop the region server.
347    pub async fn stop(&self) -> Result<()> {
348        self.inner.stop().await
349    }
350
351    #[cfg(test)]
352    /// Registers a region for test purpose.
353    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
354        self.inner
355            .region_map
356            .insert(region_id, RegionEngineWithStatus::Ready(engine));
357    }
358
359    async fn handle_batch_ddl_requests(
360        &self,
361        request: region_request::Body,
362    ) -> Result<RegionResponse> {
363        // Safety: we have already checked the request type in `RegionServer::handle()`.
364        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
365            .context(BuildRegionRequestsSnafu)?
366            .unwrap();
367        let tracing_context = TracingContext::from_current_span();
368
369        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
370        self.inner
371            .handle_batch_request(batch_request)
372            .trace(span)
373            .await
374    }
375
376    async fn handle_requests_in_parallel(
377        &self,
378        request: region_request::Body,
379    ) -> Result<RegionResponse> {
380        let requests =
381            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
382        let tracing_context = TracingContext::from_current_span();
383
384        let join_tasks = requests.into_iter().map(|(region_id, req)| {
385            let self_to_move = self;
386            let span = tracing_context.attach(info_span!(
387                "RegionServer::handle_region_request",
388                region_id = region_id.to_string()
389            ));
390            async move {
391                self_to_move
392                    .handle_request(region_id, req)
393                    .trace(span)
394                    .await
395            }
396        });
397
398        let results = try_join_all(join_tasks).await?;
399        let mut affected_rows = 0;
400        let mut extensions = HashMap::new();
401        for result in results {
402            affected_rows += result.affected_rows;
403            extensions.extend(result.extensions);
404        }
405
406        Ok(RegionResponse {
407            affected_rows,
408            extensions,
409        })
410    }
411
412    async fn handle_requests_in_serial(
413        &self,
414        request: region_request::Body,
415    ) -> Result<RegionResponse> {
416        let requests =
417            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
418        let tracing_context = TracingContext::from_current_span();
419
420        let mut affected_rows = 0;
421        let mut extensions = HashMap::new();
422        // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
423        // modify this part to avoid inefficient serial loop calls.
424        for (region_id, req) in requests {
425            let span = tracing_context.attach(info_span!(
426                "RegionServer::handle_region_request",
427                region_id = region_id.to_string()
428            ));
429            let result = self.handle_request(region_id, req).trace(span).await?;
430
431            affected_rows += result.affected_rows;
432            extensions.extend(result.extensions);
433        }
434
435        Ok(RegionResponse {
436            affected_rows,
437            extensions,
438        })
439    }
440
441    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
442        let region_id = RegionId::from_u64(request.region_id);
443        let manifest_info = request
444            .manifest_info
445            .context(error::MissingRequiredFieldSnafu {
446                name: "manifest_info",
447            })?;
448
449        let manifest_info = match manifest_info {
450            ManifestInfo::MitoManifestInfo(info) => {
451                RegionManifestInfo::mito(info.data_manifest_version, 0)
452            }
453            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
454                info.data_manifest_version,
455                0,
456                info.metadata_manifest_version,
457                0,
458            ),
459        };
460
461        let tracing_context = TracingContext::from_current_span();
462        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
463
464        self.sync_region(region_id, manifest_info)
465            .trace(span)
466            .await
467            .map(|_| RegionResponse::new(AffectedRows::default()))
468    }
469
470    /// Sync region manifest and registers new opened logical regions.
471    pub async fn sync_region(
472        &self,
473        region_id: RegionId,
474        manifest_info: RegionManifestInfo,
475    ) -> Result<()> {
476        let engine_with_status = self
477            .inner
478            .region_map
479            .get(&region_id)
480            .with_context(|| RegionNotFoundSnafu { region_id })?;
481
482        self.inner
483            .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
484            .await
485    }
486}
487
488#[async_trait]
489impl RegionServerHandler for RegionServer {
490    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
491        let response = match &request {
492            region_request::Body::Creates(_)
493            | region_request::Body::Drops(_)
494            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
495            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
496                self.handle_requests_in_parallel(request).await
497            }
498            region_request::Body::Sync(sync_request) => {
499                self.handle_sync_region_request(sync_request).await
500            }
501            _ => self.handle_requests_in_serial(request).await,
502        }
503        .map_err(BoxedError::new)
504        .context(ExecuteGrpcRequestSnafu)?;
505
506        Ok(RegionResponseV1 {
507            header: Some(ResponseHeader {
508                status: Some(Status {
509                    status_code: StatusCode::Success as _,
510                    ..Default::default()
511                }),
512            }),
513            affected_rows: response.affected_rows as _,
514            extensions: response.extensions,
515        })
516    }
517}
518
519#[async_trait]
520impl FlightCraft for RegionServer {
521    async fn do_get(
522        &self,
523        request: Request<Ticket>,
524    ) -> TonicResult<Response<TonicStream<FlightData>>> {
525        let ticket = request.into_inner().ticket;
526        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
527            .context(servers_error::InvalidFlightTicketSnafu)?;
528        let tracing_context = request
529            .header
530            .as_ref()
531            .map(|h| TracingContext::from_w3c(&h.tracing_context))
532            .unwrap_or_default();
533
534        let result = self
535            .handle_remote_read(request)
536            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
537            .await?;
538
539        let stream = Box::pin(FlightRecordBatchStream::new(result, tracing_context));
540        Ok(Response::new(stream))
541    }
542}
543
544#[derive(Clone)]
545enum RegionEngineWithStatus {
546    // An opening, or creating region.
547    Registering(RegionEngineRef),
548    // A closing, or dropping region.
549    Deregistering(RegionEngineRef),
550    // A ready region.
551    Ready(RegionEngineRef),
552}
553
554impl RegionEngineWithStatus {
555    /// Returns [RegionEngineRef].
556    pub fn into_engine(self) -> RegionEngineRef {
557        match self {
558            RegionEngineWithStatus::Registering(engine) => engine,
559            RegionEngineWithStatus::Deregistering(engine) => engine,
560            RegionEngineWithStatus::Ready(engine) => engine,
561        }
562    }
563
564    /// Returns [RegionEngineRef] reference.
565    pub fn engine(&self) -> &RegionEngineRef {
566        match self {
567            RegionEngineWithStatus::Registering(engine) => engine,
568            RegionEngineWithStatus::Deregistering(engine) => engine,
569            RegionEngineWithStatus::Ready(engine) => engine,
570        }
571    }
572}
573
574impl Deref for RegionEngineWithStatus {
575    type Target = RegionEngineRef;
576
577    fn deref(&self) -> &Self::Target {
578        match self {
579            RegionEngineWithStatus::Registering(engine) => engine,
580            RegionEngineWithStatus::Deregistering(engine) => engine,
581            RegionEngineWithStatus::Ready(engine) => engine,
582        }
583    }
584}
585
586struct RegionServerInner {
587    engines: RwLock<HashMap<String, RegionEngineRef>>,
588    region_map: DashMap<RegionId, RegionEngineWithStatus>,
589    query_engine: QueryEngineRef,
590    runtime: Runtime,
591    event_listener: RegionServerEventListenerRef,
592    table_provider_factory: TableProviderFactoryRef,
593    // The number of queries allowed to be executed at the same time.
594    // Act as last line of defense on datanode to prevent query overloading.
595    parallelism: Option<RegionServerParallelism>,
596}
597
598struct RegionServerParallelism {
599    semaphore: Semaphore,
600    timeout: Duration,
601}
602
603impl RegionServerParallelism {
604    pub fn from_opts(
605        max_concurrent_queries: usize,
606        concurrent_query_limiter_timeout: Duration,
607    ) -> Option<Self> {
608        if max_concurrent_queries == 0 {
609            return None;
610        }
611        Some(RegionServerParallelism {
612            semaphore: Semaphore::new(max_concurrent_queries),
613            timeout: concurrent_query_limiter_timeout,
614        })
615    }
616
617    pub async fn acquire(&self) -> Result<SemaphorePermit> {
618        timeout(self.timeout, self.semaphore.acquire())
619            .await
620            .context(ConcurrentQueryLimiterTimeoutSnafu)?
621            .context(ConcurrentQueryLimiterClosedSnafu)
622    }
623}
624
625enum CurrentEngine {
626    Engine(RegionEngineRef),
627    EarlyReturn(AffectedRows),
628}
629
630impl Debug for CurrentEngine {
631    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632        match self {
633            CurrentEngine::Engine(engine) => f
634                .debug_struct("CurrentEngine")
635                .field("engine", &engine.name())
636                .finish(),
637            CurrentEngine::EarlyReturn(rows) => f
638                .debug_struct("CurrentEngine")
639                .field("return", rows)
640                .finish(),
641        }
642    }
643}
644
645impl RegionServerInner {
646    pub fn new(
647        query_engine: QueryEngineRef,
648        runtime: Runtime,
649        event_listener: RegionServerEventListenerRef,
650        table_provider_factory: TableProviderFactoryRef,
651        parallelism: Option<RegionServerParallelism>,
652    ) -> Self {
653        Self {
654            engines: RwLock::new(HashMap::new()),
655            region_map: DashMap::new(),
656            query_engine,
657            runtime,
658            event_listener,
659            table_provider_factory,
660            parallelism,
661        }
662    }
663
664    pub fn register_engine(&self, engine: RegionEngineRef) {
665        let engine_name = engine.name();
666        info!("Region Engine {engine_name} is registered");
667        self.engines
668            .write()
669            .unwrap()
670            .insert(engine_name.to_string(), engine);
671    }
672
673    fn get_engine(
674        &self,
675        region_id: RegionId,
676        region_change: &RegionChange,
677    ) -> Result<CurrentEngine> {
678        let current_region_status = self.region_map.get(&region_id);
679
680        let engine = match region_change {
681            RegionChange::Register(attribute) => match current_region_status {
682                Some(status) => match status.clone() {
683                    RegionEngineWithStatus::Registering(engine) => engine,
684                    RegionEngineWithStatus::Deregistering(_) => {
685                        return error::RegionBusySnafu { region_id }.fail()
686                    }
687                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
688                },
689                _ => self
690                    .engines
691                    .read()
692                    .unwrap()
693                    .get(attribute.engine())
694                    .with_context(|| RegionEngineNotFoundSnafu {
695                        name: attribute.engine(),
696                    })?
697                    .clone(),
698            },
699            RegionChange::Deregisters => match current_region_status {
700                Some(status) => match status.clone() {
701                    RegionEngineWithStatus::Registering(_) => {
702                        return error::RegionBusySnafu { region_id }.fail()
703                    }
704                    RegionEngineWithStatus::Deregistering(_) => {
705                        return Ok(CurrentEngine::EarlyReturn(0))
706                    }
707                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
708                },
709                None => return Ok(CurrentEngine::EarlyReturn(0)),
710            },
711            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
712                match current_region_status {
713                    Some(status) => match status.clone() {
714                        RegionEngineWithStatus::Registering(_) => {
715                            return error::RegionNotReadySnafu { region_id }.fail()
716                        }
717                        RegionEngineWithStatus::Deregistering(_) => {
718                            return error::RegionNotFoundSnafu { region_id }.fail()
719                        }
720                        RegionEngineWithStatus::Ready(engine) => engine,
721                    },
722                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
723                }
724            }
725        };
726
727        Ok(CurrentEngine::Engine(engine))
728    }
729
730    async fn handle_batch_open_requests_inner(
731        &self,
732        engine: RegionEngineRef,
733        parallelism: usize,
734        requests: Vec<(RegionId, RegionOpenRequest)>,
735    ) -> Result<Vec<RegionId>> {
736        let region_changes = requests
737            .iter()
738            .map(|(region_id, open)| {
739                let attribute = parse_region_attribute(&open.engine, &open.options)?;
740                Ok((*region_id, RegionChange::Register(attribute)))
741            })
742            .collect::<Result<HashMap<_, _>>>()?;
743
744        for (&region_id, region_change) in &region_changes {
745            self.set_region_status_not_ready(region_id, &engine, region_change)
746        }
747
748        let mut open_regions = Vec::with_capacity(requests.len());
749        let mut errors = vec![];
750        match engine
751            .handle_batch_open_requests(parallelism, requests)
752            .await
753            .with_context(|_| HandleBatchOpenRequestSnafu)
754        {
755            Ok(results) => {
756                for (region_id, result) in results {
757                    let region_change = &region_changes[&region_id];
758                    match result {
759                        Ok(_) => {
760                            if let Err(e) = self
761                                .set_region_status_ready(region_id, engine.clone(), *region_change)
762                                .await
763                            {
764                                error!(e; "Failed to set region to ready: {}", region_id);
765                                errors.push(BoxedError::new(e));
766                            } else {
767                                open_regions.push(region_id)
768                            }
769                        }
770                        Err(e) => {
771                            self.unset_region_status(region_id, &engine, *region_change);
772                            error!(e; "Failed to open region: {}", region_id);
773                            errors.push(e);
774                        }
775                    }
776                }
777            }
778            Err(e) => {
779                for (&region_id, region_change) in &region_changes {
780                    self.unset_region_status(region_id, &engine, *region_change);
781                }
782                error!(e; "Failed to open batch regions");
783                errors.push(BoxedError::new(e));
784            }
785        }
786
787        if !errors.is_empty() {
788            return error::UnexpectedSnafu {
789                // Returns the first error.
790                violated: format!("Failed to open batch regions: {:?}", errors[0]),
791            }
792            .fail();
793        }
794
795        Ok(open_regions)
796    }
797
798    pub async fn handle_batch_open_requests(
799        &self,
800        parallelism: usize,
801        requests: Vec<(RegionId, RegionOpenRequest)>,
802    ) -> Result<Vec<RegionId>> {
803        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
804            HashMap::with_capacity(requests.len());
805        for (region_id, request) in requests {
806            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
807                requests.push((region_id, request));
808            } else {
809                engine_grouped_requests
810                    .insert(request.engine.to_string(), vec![(region_id, request)]);
811            }
812        }
813
814        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
815        for (engine, requests) in engine_grouped_requests {
816            let engine = self
817                .engines
818                .read()
819                .unwrap()
820                .get(&engine)
821                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
822                .clone();
823            results.push(
824                self.handle_batch_open_requests_inner(engine, parallelism, requests)
825                    .await,
826            )
827        }
828
829        Ok(results
830            .into_iter()
831            .collect::<Result<Vec<_>>>()?
832            .into_iter()
833            .flatten()
834            .collect::<Vec<_>>())
835    }
836
837    // Handle requests in batch.
838    //
839    // limitation: all create requests must be in the same engine.
840    pub async fn handle_batch_request(
841        &self,
842        batch_request: BatchRegionDdlRequest,
843    ) -> Result<RegionResponse> {
844        let region_changes = match &batch_request {
845            BatchRegionDdlRequest::Create(requests) => requests
846                .iter()
847                .map(|(region_id, create)| {
848                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
849                    Ok((*region_id, RegionChange::Register(attribute)))
850                })
851                .collect::<Result<Vec<_>>>()?,
852            BatchRegionDdlRequest::Drop(requests) => requests
853                .iter()
854                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
855                .collect::<Vec<_>>(),
856            BatchRegionDdlRequest::Alter(requests) => requests
857                .iter()
858                .map(|(region_id, _)| (*region_id, RegionChange::None))
859                .collect::<Vec<_>>(),
860        };
861
862        // The ddl procedure will ensure all requests are in the same engine.
863        // Therefore, we can get the engine from the first request.
864        let (first_region_id, first_region_change) = region_changes.first().unwrap();
865        let engine = match self.get_engine(*first_region_id, first_region_change)? {
866            CurrentEngine::Engine(engine) => engine,
867            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
868        };
869
870        for (region_id, region_change) in region_changes.iter() {
871            self.set_region_status_not_ready(*region_id, &engine, region_change);
872        }
873
874        let ddl_type = batch_request.request_type();
875        let result = engine
876            .handle_batch_ddl_requests(batch_request)
877            .await
878            .context(HandleBatchDdlRequestSnafu { ddl_type });
879
880        match result {
881            Ok(result) => {
882                for (region_id, region_change) in &region_changes {
883                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
884                        .await?;
885                }
886
887                Ok(RegionResponse {
888                    affected_rows: result.affected_rows,
889                    extensions: result.extensions,
890                })
891            }
892            Err(err) => {
893                for (region_id, region_change) in region_changes {
894                    self.unset_region_status(region_id, &engine, region_change);
895                }
896
897                Err(err)
898            }
899        }
900    }
901
902    pub async fn handle_request(
903        &self,
904        region_id: RegionId,
905        request: RegionRequest,
906    ) -> Result<RegionResponse> {
907        let request_type = request.request_type();
908        let region_id_str = region_id.to_string();
909        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
910            .with_label_values(&[&region_id_str, request_type])
911            .start_timer();
912
913        let region_change = match &request {
914            RegionRequest::Create(create) => {
915                let attribute = parse_region_attribute(&create.engine, &create.options)?;
916                RegionChange::Register(attribute)
917            }
918            RegionRequest::Open(open) => {
919                let attribute = parse_region_attribute(&open.engine, &open.options)?;
920                RegionChange::Register(attribute)
921            }
922            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
923            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
924                RegionChange::Ingest
925            }
926            RegionRequest::Alter(_)
927            | RegionRequest::Flush(_)
928            | RegionRequest::Compact(_)
929            | RegionRequest::Truncate(_) => RegionChange::None,
930            RegionRequest::Catchup(_) => RegionChange::Catchup,
931        };
932
933        let engine = match self.get_engine(region_id, &region_change)? {
934            CurrentEngine::Engine(engine) => engine,
935            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
936        };
937
938        // Sets corresponding region status to registering/deregistering before the operation.
939        self.set_region_status_not_ready(region_id, &engine, &region_change);
940
941        match engine
942            .handle_request(region_id, request)
943            .await
944            .with_context(|_| HandleRegionRequestSnafu { region_id })
945        {
946            Ok(result) => {
947                // Update metrics
948                if matches!(region_change, RegionChange::Ingest) {
949                    crate::metrics::REGION_CHANGED_ROW_COUNT
950                        .with_label_values(&[&region_id_str, request_type])
951                        .inc_by(result.affected_rows as u64);
952                }
953                // Sets corresponding region status to ready.
954                self.set_region_status_ready(region_id, engine.clone(), region_change)
955                    .await?;
956
957                Ok(RegionResponse {
958                    affected_rows: result.affected_rows,
959                    extensions: result.extensions,
960                })
961            }
962            Err(err) => {
963                // Removes the region status if the operation fails.
964                self.unset_region_status(region_id, &engine, region_change);
965                Err(err)
966            }
967        }
968    }
969
970    /// Handles the sync region request.
971    pub async fn handle_sync_region(
972        &self,
973        engine: &RegionEngineRef,
974        region_id: RegionId,
975        manifest_info: RegionManifestInfo,
976    ) -> Result<()> {
977        let Some(new_opened_regions) = engine
978            .sync_region(region_id, manifest_info)
979            .await
980            .with_context(|_| HandleRegionRequestSnafu { region_id })?
981            .new_opened_logical_region_ids()
982        else {
983            warn!("No new opened logical regions");
984            return Ok(());
985        };
986
987        for region in new_opened_regions {
988            self.region_map
989                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
990            info!("Logical region {} is registered!", region);
991        }
992
993        Ok(())
994    }
995
996    fn set_region_status_not_ready(
997        &self,
998        region_id: RegionId,
999        engine: &RegionEngineRef,
1000        region_change: &RegionChange,
1001    ) {
1002        match region_change {
1003            RegionChange::Register(_) => {
1004                self.region_map.insert(
1005                    region_id,
1006                    RegionEngineWithStatus::Registering(engine.clone()),
1007                );
1008            }
1009            RegionChange::Deregisters => {
1010                self.region_map.insert(
1011                    region_id,
1012                    RegionEngineWithStatus::Deregistering(engine.clone()),
1013                );
1014            }
1015            _ => {}
1016        }
1017    }
1018
1019    fn unset_region_status(
1020        &self,
1021        region_id: RegionId,
1022        engine: &RegionEngineRef,
1023        region_change: RegionChange,
1024    ) {
1025        match region_change {
1026            RegionChange::None | RegionChange::Ingest => {}
1027            RegionChange::Register(_) => {
1028                self.region_map.remove(&region_id);
1029            }
1030            RegionChange::Deregisters => {
1031                self.region_map
1032                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1033            }
1034            RegionChange::Catchup => {}
1035        }
1036    }
1037
1038    async fn set_region_status_ready(
1039        &self,
1040        region_id: RegionId,
1041        engine: RegionEngineRef,
1042        region_change: RegionChange,
1043    ) -> Result<()> {
1044        let engine_type = engine.name();
1045        match region_change {
1046            RegionChange::None | RegionChange::Ingest => {}
1047            RegionChange::Register(attribute) => {
1048                info!(
1049                    "Region {region_id} is registered to engine {}",
1050                    attribute.engine()
1051                );
1052                self.region_map
1053                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1054
1055                match attribute {
1056                    RegionAttribute::Metric { physical } => {
1057                        if physical {
1058                            // Registers the logical regions belong to the physical region (`region_id`).
1059                            self.register_logical_regions(&engine, region_id).await?;
1060                            // We only send the `on_region_registered` event of the physical region.
1061                            self.event_listener.on_region_registered(region_id);
1062                        }
1063                    }
1064                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1065                    RegionAttribute::File => {
1066                        // do nothing
1067                    }
1068                }
1069            }
1070            RegionChange::Deregisters => {
1071                info!("Region {region_id} is deregistered from engine {engine_type}");
1072                self.region_map
1073                    .remove(&region_id)
1074                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1075                self.event_listener.on_region_deregistered(region_id);
1076            }
1077            RegionChange::Catchup => {
1078                if is_metric_engine(engine.name()) {
1079                    // Registers the logical regions belong to the physical region (`region_id`).
1080                    self.register_logical_regions(&engine, region_id).await?;
1081                }
1082            }
1083        }
1084        Ok(())
1085    }
1086
1087    async fn register_logical_regions(
1088        &self,
1089        engine: &RegionEngineRef,
1090        physical_region_id: RegionId,
1091    ) -> Result<()> {
1092        let metric_engine =
1093            engine
1094                .as_any()
1095                .downcast_ref::<MetricEngine>()
1096                .context(UnexpectedSnafu {
1097                    violated: format!(
1098                        "expecting engine type '{}', actual '{}'",
1099                        METRIC_ENGINE_NAME,
1100                        engine.name(),
1101                    ),
1102                })?;
1103
1104        let logical_regions = metric_engine
1105            .logical_regions(physical_region_id)
1106            .await
1107            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1108
1109        for region in logical_regions {
1110            self.region_map
1111                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1112            info!("Logical region {} is registered!", region);
1113        }
1114        Ok(())
1115    }
1116
1117    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
1118        // TODO(ruihang): add metrics and set trace id
1119
1120        // Build query context from gRPC header
1121        let query_ctx: QueryContextRef = request
1122            .header
1123            .as_ref()
1124            .map(|h| Arc::new(h.into()))
1125            .unwrap_or_else(|| QueryContextBuilder::default().build().into());
1126
1127        let result = self
1128            .query_engine
1129            .execute(request.plan, query_ctx)
1130            .await
1131            .context(ExecuteLogicalPlanSnafu)?;
1132
1133        match result.data {
1134            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1135                UnsupportedOutputSnafu { expected: "stream" }.fail()
1136            }
1137            OutputData::Stream(stream) => Ok(stream),
1138        }
1139    }
1140
1141    async fn stop(&self) -> Result<()> {
1142        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1143        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1144        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1145        //
1146        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1147        // it here, collect the values first then use later separately.
1148
1149        let regions = self
1150            .region_map
1151            .iter()
1152            .map(|x| (*x.key(), x.value().clone()))
1153            .collect::<Vec<_>>();
1154        let num_regions = regions.len();
1155
1156        for (region_id, engine) in regions {
1157            let closed = engine
1158                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1159                .await;
1160            match closed {
1161                Ok(_) => debug!("Region {region_id} is closed"),
1162                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1163            }
1164        }
1165        self.region_map.clear();
1166        info!("closed {num_regions} regions");
1167
1168        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1169        for (engine_name, engine) in engines {
1170            engine
1171                .stop()
1172                .await
1173                .context(StopRegionEngineSnafu { name: &engine_name })?;
1174            info!("Region engine {engine_name} is stopped");
1175        }
1176
1177        Ok(())
1178    }
1179}
1180
1181#[derive(Debug, Clone, Copy)]
1182enum RegionChange {
1183    None,
1184    Register(RegionAttribute),
1185    Deregisters,
1186    Catchup,
1187    Ingest,
1188}
1189
1190fn is_metric_engine(engine: &str) -> bool {
1191    engine == METRIC_ENGINE_NAME
1192}
1193
1194fn parse_region_attribute(
1195    engine: &str,
1196    options: &HashMap<String, String>,
1197) -> Result<RegionAttribute> {
1198    match engine {
1199        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1200        METRIC_ENGINE_NAME => {
1201            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1202
1203            Ok(RegionAttribute::Metric { physical })
1204        }
1205        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1206        _ => error::UnexpectedSnafu {
1207            violated: format!("Unknown engine: {}", engine),
1208        }
1209        .fail(),
1210    }
1211}
1212
1213#[derive(Debug, Clone, Copy)]
1214enum RegionAttribute {
1215    Mito,
1216    Metric { physical: bool },
1217    File,
1218}
1219
1220impl RegionAttribute {
1221    fn engine(&self) -> &'static str {
1222        match self {
1223            RegionAttribute::Mito => MITO_ENGINE_NAME,
1224            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1225            RegionAttribute::File => FILE_ENGINE_NAME,
1226        }
1227    }
1228}
1229
1230#[cfg(test)]
1231mod tests {
1232
1233    use std::assert_matches::assert_matches;
1234
1235    use common_error::ext::ErrorExt;
1236    use mito2::test_util::CreateRequestBuilder;
1237    use store_api::region_engine::RegionEngine;
1238    use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
1239    use store_api::storage::RegionId;
1240
1241    use super::*;
1242    use crate::error::Result;
1243    use crate::tests::{mock_region_server, MockRegionEngine};
1244
1245    #[tokio::test]
1246    async fn test_region_registering() {
1247        common_telemetry::init_default_ut_logging();
1248
1249        let mut mock_region_server = mock_region_server();
1250        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1251        let engine_name = engine.name();
1252        mock_region_server.register_engine(engine.clone());
1253        let region_id = RegionId::new(1, 1);
1254        let builder = CreateRequestBuilder::new();
1255        let create_req = builder.build();
1256        // Tries to create/open a registering region.
1257        mock_region_server.inner.region_map.insert(
1258            region_id,
1259            RegionEngineWithStatus::Registering(engine.clone()),
1260        );
1261        let response = mock_region_server
1262            .handle_request(region_id, RegionRequest::Create(create_req))
1263            .await
1264            .unwrap();
1265        assert_eq!(response.affected_rows, 0);
1266        let status = mock_region_server
1267            .inner
1268            .region_map
1269            .get(&region_id)
1270            .unwrap()
1271            .clone();
1272        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1273
1274        mock_region_server.inner.region_map.insert(
1275            region_id,
1276            RegionEngineWithStatus::Registering(engine.clone()),
1277        );
1278        let response = mock_region_server
1279            .handle_request(
1280                region_id,
1281                RegionRequest::Open(RegionOpenRequest {
1282                    engine: engine_name.to_string(),
1283                    region_dir: String::new(),
1284                    options: Default::default(),
1285                    skip_wal_replay: false,
1286                }),
1287            )
1288            .await
1289            .unwrap();
1290        assert_eq!(response.affected_rows, 0);
1291        let status = mock_region_server
1292            .inner
1293            .region_map
1294            .get(&region_id)
1295            .unwrap()
1296            .clone();
1297        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1298    }
1299
1300    #[tokio::test]
1301    async fn test_region_deregistering() {
1302        common_telemetry::init_default_ut_logging();
1303
1304        let mut mock_region_server = mock_region_server();
1305        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1306
1307        mock_region_server.register_engine(engine.clone());
1308
1309        let region_id = RegionId::new(1, 1);
1310
1311        // Tries to drop/close a registering region.
1312        mock_region_server.inner.region_map.insert(
1313            region_id,
1314            RegionEngineWithStatus::Deregistering(engine.clone()),
1315        );
1316
1317        let response = mock_region_server
1318            .handle_request(
1319                region_id,
1320                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1321            )
1322            .await
1323            .unwrap();
1324        assert_eq!(response.affected_rows, 0);
1325
1326        let status = mock_region_server
1327            .inner
1328            .region_map
1329            .get(&region_id)
1330            .unwrap()
1331            .clone();
1332        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1333
1334        mock_region_server.inner.region_map.insert(
1335            region_id,
1336            RegionEngineWithStatus::Deregistering(engine.clone()),
1337        );
1338
1339        let response = mock_region_server
1340            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1341            .await
1342            .unwrap();
1343        assert_eq!(response.affected_rows, 0);
1344
1345        let status = mock_region_server
1346            .inner
1347            .region_map
1348            .get(&region_id)
1349            .unwrap()
1350            .clone();
1351        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1352    }
1353
1354    #[tokio::test]
1355    async fn test_region_not_ready() {
1356        common_telemetry::init_default_ut_logging();
1357
1358        let mut mock_region_server = mock_region_server();
1359        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1360
1361        mock_region_server.register_engine(engine.clone());
1362
1363        let region_id = RegionId::new(1, 1);
1364
1365        // Tries to drop/close a registering region.
1366        mock_region_server.inner.region_map.insert(
1367            region_id,
1368            RegionEngineWithStatus::Registering(engine.clone()),
1369        );
1370
1371        let err = mock_region_server
1372            .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
1373            .await
1374            .unwrap_err();
1375
1376        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1377    }
1378
1379    #[tokio::test]
1380    async fn test_region_request_failed() {
1381        common_telemetry::init_default_ut_logging();
1382
1383        let mut mock_region_server = mock_region_server();
1384        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1385            MITO_ENGINE_NAME,
1386            Box::new(|_region_id, _request| {
1387                error::UnexpectedSnafu {
1388                    violated: "test".to_string(),
1389                }
1390                .fail()
1391            }),
1392        );
1393
1394        mock_region_server.register_engine(engine.clone());
1395
1396        let region_id = RegionId::new(1, 1);
1397        let builder = CreateRequestBuilder::new();
1398        let create_req = builder.build();
1399        mock_region_server
1400            .handle_request(region_id, RegionRequest::Create(create_req))
1401            .await
1402            .unwrap_err();
1403
1404        let status = mock_region_server.inner.region_map.get(&region_id);
1405        assert!(status.is_none());
1406
1407        mock_region_server
1408            .inner
1409            .region_map
1410            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1411
1412        mock_region_server
1413            .handle_request(
1414                region_id,
1415                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1416            )
1417            .await
1418            .unwrap_err();
1419
1420        let status = mock_region_server.inner.region_map.get(&region_id);
1421        assert!(status.is_some());
1422    }
1423
1424    struct CurrentEngineTest {
1425        region_id: RegionId,
1426        current_region_status: Option<RegionEngineWithStatus>,
1427        region_change: RegionChange,
1428        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1429    }
1430
1431    #[tokio::test]
1432    async fn test_current_engine() {
1433        common_telemetry::init_default_ut_logging();
1434
1435        let mut mock_region_server = mock_region_server();
1436        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1437        mock_region_server.register_engine(engine.clone());
1438
1439        let region_id = RegionId::new(1024, 1);
1440        let tests = vec![
1441            // RegionChange::None
1442            CurrentEngineTest {
1443                region_id,
1444                current_region_status: None,
1445                region_change: RegionChange::None,
1446                assert: Box::new(|result| {
1447                    let err = result.unwrap_err();
1448                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1449                }),
1450            },
1451            CurrentEngineTest {
1452                region_id,
1453                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1454                region_change: RegionChange::None,
1455                assert: Box::new(|result| {
1456                    let current_engine = result.unwrap();
1457                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1458                }),
1459            },
1460            CurrentEngineTest {
1461                region_id,
1462                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1463                region_change: RegionChange::None,
1464                assert: Box::new(|result| {
1465                    let err = result.unwrap_err();
1466                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1467                }),
1468            },
1469            CurrentEngineTest {
1470                region_id,
1471                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1472                region_change: RegionChange::None,
1473                assert: Box::new(|result| {
1474                    let err = result.unwrap_err();
1475                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1476                }),
1477            },
1478            // RegionChange::Register
1479            CurrentEngineTest {
1480                region_id,
1481                current_region_status: None,
1482                region_change: RegionChange::Register(RegionAttribute::Mito),
1483                assert: Box::new(|result| {
1484                    let current_engine = result.unwrap();
1485                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1486                }),
1487            },
1488            CurrentEngineTest {
1489                region_id,
1490                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1491                region_change: RegionChange::Register(RegionAttribute::Mito),
1492                assert: Box::new(|result| {
1493                    let current_engine = result.unwrap();
1494                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1495                }),
1496            },
1497            CurrentEngineTest {
1498                region_id,
1499                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1500                region_change: RegionChange::Register(RegionAttribute::Mito),
1501                assert: Box::new(|result| {
1502                    let err = result.unwrap_err();
1503                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1504                }),
1505            },
1506            CurrentEngineTest {
1507                region_id,
1508                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1509                region_change: RegionChange::Register(RegionAttribute::Mito),
1510                assert: Box::new(|result| {
1511                    let current_engine = result.unwrap();
1512                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1513                }),
1514            },
1515            // RegionChange::Deregister
1516            CurrentEngineTest {
1517                region_id,
1518                current_region_status: None,
1519                region_change: RegionChange::Deregisters,
1520                assert: Box::new(|result| {
1521                    let current_engine = result.unwrap();
1522                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1523                }),
1524            },
1525            CurrentEngineTest {
1526                region_id,
1527                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1528                region_change: RegionChange::Deregisters,
1529                assert: Box::new(|result| {
1530                    let err = result.unwrap_err();
1531                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1532                }),
1533            },
1534            CurrentEngineTest {
1535                region_id,
1536                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1537                region_change: RegionChange::Deregisters,
1538                assert: Box::new(|result| {
1539                    let current_engine = result.unwrap();
1540                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1541                }),
1542            },
1543            CurrentEngineTest {
1544                region_id,
1545                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1546                region_change: RegionChange::Deregisters,
1547                assert: Box::new(|result| {
1548                    let current_engine = result.unwrap();
1549                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1550                }),
1551            },
1552        ];
1553
1554        for test in tests {
1555            let CurrentEngineTest {
1556                region_id,
1557                current_region_status,
1558                region_change,
1559                assert,
1560            } = test;
1561
1562            // Sets up
1563            if let Some(status) = current_region_status {
1564                mock_region_server
1565                    .inner
1566                    .region_map
1567                    .insert(region_id, status);
1568            } else {
1569                mock_region_server.inner.region_map.remove(&region_id);
1570            }
1571
1572            let result = mock_region_server
1573                .inner
1574                .get_engine(region_id, &region_change);
1575
1576            assert(result);
1577        }
1578    }
1579
1580    #[tokio::test]
1581    async fn test_region_server_parallelism() {
1582        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1583        let first_query = p.acquire().await;
1584        assert!(first_query.is_ok());
1585        let second_query = p.acquire().await;
1586        assert!(second_query.is_ok());
1587        let third_query = p.acquire().await;
1588        assert!(third_query.is_err());
1589        let err = third_query.unwrap_err();
1590        assert_eq!(
1591            err.output_msg(),
1592            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1593        );
1594        drop(first_query);
1595        let forth_query = p.acquire().await;
1596        assert!(forth_query.is_ok());
1597    }
1598}