datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use futures_util::future::try_join_all;
48use metric_engine::engine::MetricEngine;
49use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
50use prost::Message;
51use query::QueryEngineRef;
52pub use query::dummy_catalog::{
53    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
54};
55use serde_json;
56use servers::error::{
57    self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
58};
59use servers::grpc::FlightCompression;
60use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
61use servers::grpc::region_server::RegionServerHandler;
62use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
63use snafu::{OptionExt, ResultExt, ensure};
64use store_api::metric_engine_consts::{
65    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
66};
67use store_api::region_engine::{
68    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
69    SettableRegionRoleState,
70};
71use store_api::region_request::{
72    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
73    RegionOpenRequest, RegionRequest,
74};
75use store_api::storage::RegionId;
76use tokio::sync::{Semaphore, SemaphorePermit};
77use tokio::time::timeout;
78use tonic::{Request, Response, Result as TonicResult};
79
80use crate::error::{
81    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
82    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
83    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
84    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
85    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
86    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
87};
88use crate::event_listener::RegionServerEventListenerRef;
89use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
90
91#[derive(Clone)]
92pub struct RegionServer {
93    inner: Arc<RegionServerInner>,
94    flight_compression: FlightCompression,
95    suspend: Arc<AtomicBool>,
96}
97
98pub struct RegionStat {
99    pub region_id: RegionId,
100    pub engine: String,
101    pub role: RegionRole,
102}
103
104impl RegionServer {
105    pub fn new(
106        query_engine: QueryEngineRef,
107        runtime: Runtime,
108        event_listener: RegionServerEventListenerRef,
109        flight_compression: FlightCompression,
110    ) -> Self {
111        Self::with_table_provider(
112            query_engine,
113            runtime,
114            event_listener,
115            Arc::new(DummyTableProviderFactory),
116            0,
117            Duration::from_millis(0),
118            flight_compression,
119        )
120    }
121
122    pub fn with_table_provider(
123        query_engine: QueryEngineRef,
124        runtime: Runtime,
125        event_listener: RegionServerEventListenerRef,
126        table_provider_factory: TableProviderFactoryRef,
127        max_concurrent_queries: usize,
128        concurrent_query_limiter_timeout: Duration,
129        flight_compression: FlightCompression,
130    ) -> Self {
131        Self {
132            inner: Arc::new(RegionServerInner::new(
133                query_engine,
134                runtime,
135                event_listener,
136                table_provider_factory,
137                RegionServerParallelism::from_opts(
138                    max_concurrent_queries,
139                    concurrent_query_limiter_timeout,
140                ),
141            )),
142            flight_compression,
143            suspend: Arc::new(AtomicBool::new(false)),
144        }
145    }
146
147    /// Registers an engine.
148    pub fn register_engine(&mut self, engine: RegionEngineRef) {
149        self.inner.register_engine(engine);
150    }
151
152    /// Sets the topic stats.
153    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
154        self.inner.set_topic_stats_reporter(topic_stats_reporter);
155    }
156
157    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
158    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
159        match self.inner.get_engine(region_id, &RegionChange::None) {
160            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
161            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
162            Err(error::Error::RegionNotFound { .. }) => Ok(None),
163            Err(err) => Err(err),
164        }
165    }
166
167    /// Gets the MitoEngine if it's registered.
168    pub fn mito_engine(&self) -> Option<MitoEngine> {
169        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
170            Some(mito)
171        } else {
172            self.inner
173                .engines
174                .read()
175                .unwrap()
176                .get(MITO_ENGINE_NAME)
177                .cloned()
178                .and_then(|e| {
179                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
180                    if mito.is_none() {
181                        warn!("Mito engine not found in region server engines");
182                    }
183                    mito
184                })
185        }
186    }
187
188    #[tracing::instrument(skip_all)]
189    pub async fn handle_batch_open_requests(
190        &self,
191        parallelism: usize,
192        requests: Vec<(RegionId, RegionOpenRequest)>,
193        ignore_nonexistent_region: bool,
194    ) -> Result<Vec<RegionId>> {
195        self.inner
196            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
197            .await
198    }
199
200    #[tracing::instrument(skip_all)]
201    pub async fn handle_batch_catchup_requests(
202        &self,
203        parallelism: usize,
204        requests: Vec<(RegionId, RegionCatchupRequest)>,
205    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
206        self.inner
207            .handle_batch_catchup_requests(parallelism, requests)
208            .await
209    }
210
211    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
212    pub async fn handle_request(
213        &self,
214        region_id: RegionId,
215        request: RegionRequest,
216    ) -> Result<RegionResponse> {
217        self.inner.handle_request(region_id, request).await
218    }
219
220    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
221    async fn table_provider(
222        &self,
223        region_id: RegionId,
224        ctx: Option<QueryContextRef>,
225    ) -> Result<Arc<dyn TableProvider>> {
226        let status = self
227            .inner
228            .region_map
229            .get(&region_id)
230            .context(RegionNotFoundSnafu { region_id })?
231            .clone();
232        ensure!(
233            matches!(status, RegionEngineWithStatus::Ready(_)),
234            RegionNotReadySnafu { region_id }
235        );
236
237        self.inner
238            .table_provider_factory
239            .create(region_id, status.into_engine(), ctx)
240            .await
241            .context(ExecuteLogicalPlanSnafu)
242    }
243
244    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
245    pub async fn handle_remote_read(
246        &self,
247        request: api::v1::region::QueryRequest,
248        query_ctx: QueryContextRef,
249    ) -> Result<SendableRecordBatchStream> {
250        let _permit = if let Some(p) = &self.inner.parallelism {
251            Some(p.acquire().await?)
252        } else {
253            None
254        };
255
256        let region_id = RegionId::from_u64(request.region_id);
257        let catalog_list = Arc::new(NameAwareCatalogList::new(
258            self.clone(),
259            region_id,
260            query_ctx.clone(),
261        ));
262
263        if query_ctx.explain_verbose() {
264            common_telemetry::info!("Handle remote read for region: {}", region_id);
265        }
266
267        let decoder = self
268            .inner
269            .query_engine
270            .engine_context(query_ctx.clone())
271            .new_plan_decoder()
272            .context(NewPlanDecoderSnafu)?;
273
274        let plan = decoder
275            .decode(Bytes::from(request.plan), catalog_list, false)
276            .await
277            .context(DecodeLogicalPlanSnafu)?;
278
279        self.inner
280            .handle_read(
281                QueryRequest {
282                    header: request.header,
283                    region_id,
284                    plan,
285                },
286                query_ctx,
287            )
288            .await
289    }
290
291    #[tracing::instrument(skip_all)]
292    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
293        let _permit = if let Some(p) = &self.inner.parallelism {
294            Some(p.acquire().await?)
295        } else {
296            None
297        };
298
299        let ctx = request.header.as_ref().map(|h| h.into());
300        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
301
302        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
303            .context(DataFusionSnafu)?;
304        let mut injector = injector_builder
305            .build(self, request.region_id, query_ctx.clone())
306            .await?;
307
308        let plan = request
309            .plan
310            .rewrite(&mut injector)
311            .context(DataFusionSnafu)?
312            .data;
313
314        self.inner
315            .handle_read(QueryRequest { plan, ..request }, query_ctx)
316            .await
317    }
318
319    /// Returns all opened and reportable regions.
320    ///
321    /// Notes: except all metrics regions.
322    pub fn reportable_regions(&self) -> Vec<RegionStat> {
323        self.inner
324            .region_map
325            .iter()
326            .filter_map(|e| {
327                let region_id = *e.key();
328                // Filters out any regions whose role equals None.
329                e.role(region_id).map(|role| RegionStat {
330                    region_id,
331                    engine: e.value().name().to_string(),
332                    role,
333                })
334            })
335            .collect()
336    }
337
338    /// Returns the reportable topics.
339    pub fn topic_stats(&self) -> Vec<TopicStat> {
340        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
341        let Some(reporter) = reporter.as_mut() else {
342            return vec![];
343        };
344        reporter
345            .reportable_topics()
346            .into_iter()
347            .map(|stat| TopicStat {
348                topic_name: stat.topic,
349                record_size: stat.record_size,
350                record_num: stat.record_num,
351                latest_entry_id: stat.latest_entry_id,
352            })
353            .collect()
354    }
355
356    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
357        self.inner.region_map.get(&region_id).and_then(|engine| {
358            engine.role(region_id).map(|role| match role {
359                RegionRole::Follower => false,
360                RegionRole::Leader => true,
361                RegionRole::DowngradingLeader => true,
362            })
363        })
364    }
365
366    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
367        let engine = self
368            .inner
369            .region_map
370            .get(&region_id)
371            .with_context(|| RegionNotFoundSnafu { region_id })?;
372        engine
373            .set_region_role(region_id, role)
374            .with_context(|_| HandleRegionRequestSnafu { region_id })
375    }
376
377    /// Set region role state gracefully.
378    ///
379    /// For [SettableRegionRoleState::Follower]:
380    /// After the call returns, the engine ensures that
381    /// no **further** write or flush operations will succeed in this region.
382    ///
383    /// For [SettableRegionRoleState::DowngradingLeader]:
384    /// After the call returns, the engine ensures that
385    /// no **further** write operations will succeed in this region.
386    pub async fn set_region_role_state_gracefully(
387        &self,
388        region_id: RegionId,
389        state: SettableRegionRoleState,
390    ) -> Result<SetRegionRoleStateResponse> {
391        match self.inner.region_map.get(&region_id) {
392            Some(engine) => Ok(engine
393                .set_region_role_state_gracefully(region_id, state)
394                .await
395                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
396            None => Ok(SetRegionRoleStateResponse::NotFound),
397        }
398    }
399
400    pub fn runtime(&self) -> Runtime {
401        self.inner.runtime.clone()
402    }
403
404    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
405        match self.inner.region_map.get(&region_id) {
406            Some(e) => e.region_statistic(region_id),
407            None => None,
408        }
409    }
410
411    /// Stop the region server.
412    pub async fn stop(&self) -> Result<()> {
413        self.inner.stop().await
414    }
415
416    #[cfg(test)]
417    /// Registers a region for test purpose.
418    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
419        {
420            let mut engines = self.inner.engines.write().unwrap();
421            if !engines.contains_key(engine.name()) {
422                debug!("Registering test engine: {}", engine.name());
423                engines.insert(engine.name().to_string(), engine.clone());
424            }
425        }
426
427        self.inner
428            .region_map
429            .insert(region_id, RegionEngineWithStatus::Ready(engine));
430    }
431
432    async fn handle_batch_ddl_requests(
433        &self,
434        request: region_request::Body,
435    ) -> Result<RegionResponse> {
436        // Safety: we have already checked the request type in `RegionServer::handle()`.
437        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
438            .context(BuildRegionRequestsSnafu)?
439            .unwrap();
440        let tracing_context = TracingContext::from_current_span();
441
442        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
443        self.inner
444            .handle_batch_request(batch_request)
445            .trace(span)
446            .await
447    }
448
449    async fn handle_requests_in_parallel(
450        &self,
451        request: region_request::Body,
452    ) -> Result<RegionResponse> {
453        let requests =
454            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
455        let tracing_context = TracingContext::from_current_span();
456
457        let join_tasks = requests.into_iter().map(|(region_id, req)| {
458            let self_to_move = self;
459            let span = tracing_context.attach(info_span!(
460                "RegionServer::handle_region_request",
461                region_id = region_id.to_string()
462            ));
463            async move {
464                self_to_move
465                    .handle_request(region_id, req)
466                    .trace(span)
467                    .await
468            }
469        });
470
471        let results = try_join_all(join_tasks).await?;
472        let mut affected_rows = 0;
473        let mut extensions = HashMap::new();
474        for result in results {
475            affected_rows += result.affected_rows;
476            extensions.extend(result.extensions);
477        }
478
479        Ok(RegionResponse {
480            affected_rows,
481            extensions,
482            metadata: Vec::new(),
483        })
484    }
485
486    async fn handle_requests_in_serial(
487        &self,
488        request: region_request::Body,
489    ) -> Result<RegionResponse> {
490        let requests =
491            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
492        let tracing_context = TracingContext::from_current_span();
493
494        let mut affected_rows = 0;
495        let mut extensions = HashMap::new();
496        // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
497        // modify this part to avoid inefficient serial loop calls.
498        for (region_id, req) in requests {
499            let span = tracing_context.attach(info_span!(
500                "RegionServer::handle_region_request",
501                region_id = region_id.to_string()
502            ));
503            let result = self.handle_request(region_id, req).trace(span).await?;
504
505            affected_rows += result.affected_rows;
506            extensions.extend(result.extensions);
507        }
508
509        Ok(RegionResponse {
510            affected_rows,
511            extensions,
512            metadata: Vec::new(),
513        })
514    }
515
516    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
517        let region_id = RegionId::from_u64(request.region_id);
518        let manifest_info = request
519            .manifest_info
520            .context(error::MissingRequiredFieldSnafu {
521                name: "manifest_info",
522            })?;
523
524        let manifest_info = match manifest_info {
525            ManifestInfo::MitoManifestInfo(info) => {
526                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
527            }
528            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
529                info.data_manifest_version,
530                0,
531                info.metadata_manifest_version,
532                0,
533            ),
534        };
535
536        let tracing_context = TracingContext::from_current_span();
537        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
538
539        self.sync_region(region_id, manifest_info)
540            .trace(span)
541            .await
542            .map(|_| RegionResponse::new(AffectedRows::default()))
543    }
544
545    /// Handles the ListMetadata request and retrieves metadata for specified regions.
546    ///
547    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
548    /// non-existing regions as `null`.
549    #[tracing::instrument(skip_all)]
550    async fn handle_list_metadata_request(
551        &self,
552        request: &ListMetadataRequest,
553    ) -> Result<RegionResponse> {
554        let mut region_metadatas = Vec::new();
555        // Collect metadata for each region
556        for region_id in &request.region_ids {
557            let region_id = RegionId::from_u64(*region_id);
558            // Get the engine.
559            let Some(engine) = self.find_engine(region_id)? else {
560                region_metadatas.push(None);
561                continue;
562            };
563
564            match engine.get_metadata(region_id).await {
565                Ok(metadata) => region_metadatas.push(Some(metadata)),
566                Err(err) => {
567                    if err.status_code() == StatusCode::RegionNotFound {
568                        region_metadatas.push(None);
569                    } else {
570                        Err(err).with_context(|_| GetRegionMetadataSnafu {
571                            engine: engine.name(),
572                            region_id,
573                        })?;
574                    }
575                }
576            }
577        }
578
579        // Serialize metadata to JSON
580        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
581
582        let response = RegionResponse::from_metadata(json_result);
583
584        Ok(response)
585    }
586
587    /// Sync region manifest and registers new opened logical regions.
588    pub async fn sync_region(
589        &self,
590        region_id: RegionId,
591        manifest_info: RegionManifestInfo,
592    ) -> Result<()> {
593        let engine_with_status = self
594            .inner
595            .region_map
596            .get(&region_id)
597            .with_context(|| RegionNotFoundSnafu { region_id })?;
598
599        self.inner
600            .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
601            .await
602    }
603
604    fn is_suspended(&self) -> bool {
605        self.suspend.load(Ordering::Relaxed)
606    }
607
608    pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
609        self.suspend.clone()
610    }
611}
612
613#[async_trait]
614impl RegionServerHandler for RegionServer {
615    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
616        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
617            .with_label_values(&[request.as_ref()]);
618        let response = match &request {
619            region_request::Body::Creates(_)
620            | region_request::Body::Drops(_)
621            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
622            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
623                self.handle_requests_in_parallel(request).await
624            }
625            region_request::Body::Sync(sync_request) => {
626                self.handle_sync_region_request(sync_request).await
627            }
628            region_request::Body::ListMetadata(list_metadata_request) => {
629                self.handle_list_metadata_request(list_metadata_request)
630                    .await
631            }
632            _ => self.handle_requests_in_serial(request).await,
633        }
634        .map_err(BoxedError::new)
635        .inspect_err(|_| {
636            failed_requests_cnt.inc();
637        })
638        .context(ExecuteGrpcRequestSnafu)?;
639
640        Ok(RegionResponseV1 {
641            header: Some(ResponseHeader {
642                status: Some(Status {
643                    status_code: StatusCode::Success as _,
644                    ..Default::default()
645                }),
646            }),
647            affected_rows: response.affected_rows as _,
648            extensions: response.extensions,
649            metadata: response.metadata,
650        })
651    }
652}
653
654#[async_trait]
655impl FlightCraft for RegionServer {
656    async fn do_get(
657        &self,
658        request: Request<Ticket>,
659    ) -> TonicResult<Response<TonicStream<FlightData>>> {
660        ensure!(!self.is_suspended(), SuspendedSnafu);
661
662        let ticket = request.into_inner().ticket;
663        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
664            .context(servers_error::InvalidFlightTicketSnafu)?;
665        let tracing_context = request
666            .header
667            .as_ref()
668            .map(|h| TracingContext::from_w3c(&h.tracing_context))
669            .unwrap_or_default();
670        let query_ctx = request
671            .header
672            .as_ref()
673            .map(|h| Arc::new(QueryContext::from(h)))
674            .unwrap_or(QueryContext::arc());
675
676        let result = self
677            .handle_remote_read(request, query_ctx.clone())
678            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
679            .await?;
680
681        let stream = Box::pin(FlightRecordBatchStream::new(
682            result,
683            tracing_context,
684            self.flight_compression,
685            query_ctx,
686        ));
687        Ok(Response::new(stream))
688    }
689}
690
691#[derive(Clone)]
692enum RegionEngineWithStatus {
693    // An opening, or creating region.
694    Registering(RegionEngineRef),
695    // A closing, or dropping region.
696    Deregistering(RegionEngineRef),
697    // A ready region.
698    Ready(RegionEngineRef),
699}
700
701impl RegionEngineWithStatus {
702    /// Returns [RegionEngineRef].
703    pub fn into_engine(self) -> RegionEngineRef {
704        match self {
705            RegionEngineWithStatus::Registering(engine) => engine,
706            RegionEngineWithStatus::Deregistering(engine) => engine,
707            RegionEngineWithStatus::Ready(engine) => engine,
708        }
709    }
710
711    /// Returns [RegionEngineRef] reference.
712    pub fn engine(&self) -> &RegionEngineRef {
713        match self {
714            RegionEngineWithStatus::Registering(engine) => engine,
715            RegionEngineWithStatus::Deregistering(engine) => engine,
716            RegionEngineWithStatus::Ready(engine) => engine,
717        }
718    }
719}
720
721impl Deref for RegionEngineWithStatus {
722    type Target = RegionEngineRef;
723
724    fn deref(&self) -> &Self::Target {
725        match self {
726            RegionEngineWithStatus::Registering(engine) => engine,
727            RegionEngineWithStatus::Deregistering(engine) => engine,
728            RegionEngineWithStatus::Ready(engine) => engine,
729        }
730    }
731}
732
733struct RegionServerInner {
734    engines: RwLock<HashMap<String, RegionEngineRef>>,
735    region_map: DashMap<RegionId, RegionEngineWithStatus>,
736    query_engine: QueryEngineRef,
737    runtime: Runtime,
738    event_listener: RegionServerEventListenerRef,
739    table_provider_factory: TableProviderFactoryRef,
740    /// The number of queries allowed to be executed at the same time.
741    /// Act as last line of defense on datanode to prevent query overloading.
742    parallelism: Option<RegionServerParallelism>,
743    /// The topic stats reporter.
744    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
745    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
746    /// server with a concrete engine; acceptable for now to fetch Mito-specific
747    /// info (e.g., list SSTs). Consider a diagnostics trait later.
748    mito_engine: RwLock<Option<MitoEngine>>,
749}
750
751struct RegionServerParallelism {
752    semaphore: Semaphore,
753    timeout: Duration,
754}
755
756impl RegionServerParallelism {
757    pub fn from_opts(
758        max_concurrent_queries: usize,
759        concurrent_query_limiter_timeout: Duration,
760    ) -> Option<Self> {
761        if max_concurrent_queries == 0 {
762            return None;
763        }
764        Some(RegionServerParallelism {
765            semaphore: Semaphore::new(max_concurrent_queries),
766            timeout: concurrent_query_limiter_timeout,
767        })
768    }
769
770    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
771        timeout(self.timeout, self.semaphore.acquire())
772            .await
773            .context(ConcurrentQueryLimiterTimeoutSnafu)?
774            .context(ConcurrentQueryLimiterClosedSnafu)
775    }
776}
777
778enum CurrentEngine {
779    Engine(RegionEngineRef),
780    EarlyReturn(AffectedRows),
781}
782
783impl Debug for CurrentEngine {
784    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
785        match self {
786            CurrentEngine::Engine(engine) => f
787                .debug_struct("CurrentEngine")
788                .field("engine", &engine.name())
789                .finish(),
790            CurrentEngine::EarlyReturn(rows) => f
791                .debug_struct("CurrentEngine")
792                .field("return", rows)
793                .finish(),
794        }
795    }
796}
797
798impl RegionServerInner {
799    pub fn new(
800        query_engine: QueryEngineRef,
801        runtime: Runtime,
802        event_listener: RegionServerEventListenerRef,
803        table_provider_factory: TableProviderFactoryRef,
804        parallelism: Option<RegionServerParallelism>,
805    ) -> Self {
806        Self {
807            engines: RwLock::new(HashMap::new()),
808            region_map: DashMap::new(),
809            query_engine,
810            runtime,
811            event_listener,
812            table_provider_factory,
813            parallelism,
814            topic_stats_reporter: RwLock::new(None),
815            mito_engine: RwLock::new(None),
816        }
817    }
818
819    pub fn register_engine(&self, engine: RegionEngineRef) {
820        let engine_name = engine.name();
821        if engine_name == MITO_ENGINE_NAME
822            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
823        {
824            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
825        }
826
827        info!("Region Engine {engine_name} is registered");
828        self.engines
829            .write()
830            .unwrap()
831            .insert(engine_name.to_string(), engine);
832    }
833
834    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
835        info!("Set topic stats reporter");
836        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
837    }
838
839    fn get_engine(
840        &self,
841        region_id: RegionId,
842        region_change: &RegionChange,
843    ) -> Result<CurrentEngine> {
844        let current_region_status = self.region_map.get(&region_id);
845
846        let engine = match region_change {
847            RegionChange::Register(attribute) => match current_region_status {
848                Some(status) => match status.clone() {
849                    RegionEngineWithStatus::Registering(engine) => engine,
850                    RegionEngineWithStatus::Deregistering(_) => {
851                        return error::RegionBusySnafu { region_id }.fail();
852                    }
853                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
854                },
855                _ => self
856                    .engines
857                    .read()
858                    .unwrap()
859                    .get(attribute.engine())
860                    .with_context(|| RegionEngineNotFoundSnafu {
861                        name: attribute.engine(),
862                    })?
863                    .clone(),
864            },
865            RegionChange::Deregisters => match current_region_status {
866                Some(status) => match status.clone() {
867                    RegionEngineWithStatus::Registering(_) => {
868                        return error::RegionBusySnafu { region_id }.fail();
869                    }
870                    RegionEngineWithStatus::Deregistering(_) => {
871                        return Ok(CurrentEngine::EarlyReturn(0));
872                    }
873                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
874                },
875                None => return Ok(CurrentEngine::EarlyReturn(0)),
876            },
877            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
878                match current_region_status {
879                    Some(status) => match status.clone() {
880                        RegionEngineWithStatus::Registering(_) => {
881                            return error::RegionNotReadySnafu { region_id }.fail();
882                        }
883                        RegionEngineWithStatus::Deregistering(_) => {
884                            return error::RegionNotFoundSnafu { region_id }.fail();
885                        }
886                        RegionEngineWithStatus::Ready(engine) => engine,
887                    },
888                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
889                }
890            }
891        };
892
893        Ok(CurrentEngine::Engine(engine))
894    }
895
896    async fn handle_batch_open_requests_inner(
897        &self,
898        engine: RegionEngineRef,
899        parallelism: usize,
900        requests: Vec<(RegionId, RegionOpenRequest)>,
901        ignore_nonexistent_region: bool,
902    ) -> Result<Vec<RegionId>> {
903        let region_changes = requests
904            .iter()
905            .map(|(region_id, open)| {
906                let attribute = parse_region_attribute(&open.engine, &open.options)?;
907                Ok((*region_id, RegionChange::Register(attribute)))
908            })
909            .collect::<Result<HashMap<_, _>>>()?;
910
911        for (&region_id, region_change) in &region_changes {
912            self.set_region_status_not_ready(region_id, &engine, region_change)
913        }
914
915        let mut open_regions = Vec::with_capacity(requests.len());
916        let mut errors = vec![];
917        match engine
918            .handle_batch_open_requests(parallelism, requests)
919            .await
920            .with_context(|_| HandleBatchOpenRequestSnafu)
921        {
922            Ok(results) => {
923                for (region_id, result) in results {
924                    let region_change = &region_changes[&region_id];
925                    match result {
926                        Ok(_) => {
927                            if let Err(e) = self
928                                .set_region_status_ready(region_id, engine.clone(), *region_change)
929                                .await
930                            {
931                                error!(e; "Failed to set region to ready: {}", region_id);
932                                errors.push(BoxedError::new(e));
933                            } else {
934                                open_regions.push(region_id)
935                            }
936                        }
937                        Err(e) => {
938                            self.unset_region_status(region_id, &engine, *region_change);
939                            if e.status_code() == StatusCode::RegionNotFound
940                                && ignore_nonexistent_region
941                            {
942                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
943                            } else {
944                                error!(e; "Failed to open region: {}", region_id);
945                                errors.push(e);
946                            }
947                        }
948                    }
949                }
950            }
951            Err(e) => {
952                for (&region_id, region_change) in &region_changes {
953                    self.unset_region_status(region_id, &engine, *region_change);
954                }
955                error!(e; "Failed to open batch regions");
956                errors.push(BoxedError::new(e));
957            }
958        }
959
960        if !errors.is_empty() {
961            return error::UnexpectedSnafu {
962                // Returns the first error.
963                violated: format!("Failed to open batch regions: {:?}", errors[0]),
964            }
965            .fail();
966        }
967
968        Ok(open_regions)
969    }
970
971    pub async fn handle_batch_open_requests(
972        &self,
973        parallelism: usize,
974        requests: Vec<(RegionId, RegionOpenRequest)>,
975        ignore_nonexistent_region: bool,
976    ) -> Result<Vec<RegionId>> {
977        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
978            HashMap::with_capacity(requests.len());
979        for (region_id, request) in requests {
980            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
981                requests.push((region_id, request));
982            } else {
983                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
984            }
985        }
986
987        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
988        for (engine, requests) in engine_grouped_requests {
989            let engine = self
990                .engines
991                .read()
992                .unwrap()
993                .get(&engine)
994                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
995                .clone();
996            results.push(
997                self.handle_batch_open_requests_inner(
998                    engine,
999                    parallelism,
1000                    requests,
1001                    ignore_nonexistent_region,
1002                )
1003                .await,
1004            )
1005        }
1006
1007        Ok(results
1008            .into_iter()
1009            .collect::<Result<Vec<_>>>()?
1010            .into_iter()
1011            .flatten()
1012            .collect::<Vec<_>>())
1013    }
1014
1015    pub async fn handle_batch_catchup_requests_inner(
1016        &self,
1017        engine: RegionEngineRef,
1018        parallelism: usize,
1019        requests: Vec<(RegionId, RegionCatchupRequest)>,
1020    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1021        for (region_id, _) in &requests {
1022            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1023        }
1024        let region_ids = requests
1025            .iter()
1026            .map(|(region_id, _)| *region_id)
1027            .collect::<Vec<_>>();
1028        let mut responses = Vec::with_capacity(requests.len());
1029        match engine
1030            .handle_batch_catchup_requests(parallelism, requests)
1031            .await
1032        {
1033            Ok(results) => {
1034                for (region_id, result) in results {
1035                    match result {
1036                        Ok(_) => {
1037                            if let Err(e) = self
1038                                .set_region_status_ready(
1039                                    region_id,
1040                                    engine.clone(),
1041                                    RegionChange::Catchup,
1042                                )
1043                                .await
1044                            {
1045                                error!(e; "Failed to set region to ready: {}", region_id);
1046                                responses.push((region_id, Err(BoxedError::new(e))));
1047                            } else {
1048                                responses.push((region_id, Ok(())));
1049                            }
1050                        }
1051                        Err(e) => {
1052                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1053                            error!(e; "Failed to catchup region: {}", region_id);
1054                            responses.push((region_id, Err(e)));
1055                        }
1056                    }
1057                }
1058            }
1059            Err(e) => {
1060                for region_id in region_ids {
1061                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1062                }
1063                error!(e; "Failed to catchup batch regions");
1064                return error::UnexpectedSnafu {
1065                    violated: format!("Failed to catchup batch regions: {:?}", e),
1066                }
1067                .fail();
1068            }
1069        }
1070
1071        Ok(responses)
1072    }
1073
1074    pub async fn handle_batch_catchup_requests(
1075        &self,
1076        parallelism: usize,
1077        requests: Vec<(RegionId, RegionCatchupRequest)>,
1078    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1079        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1080
1081        let mut responses = Vec::with_capacity(requests.len());
1082        for (region_id, request) in requests {
1083            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1084                match engine {
1085                    CurrentEngine::Engine(engine) => {
1086                        engine_grouped_requests
1087                            .entry(engine.name().to_string())
1088                            .or_default()
1089                            .push((region_id, request));
1090                    }
1091                    CurrentEngine::EarlyReturn(_) => {
1092                        return error::UnexpectedSnafu {
1093                            violated: format!("Unexpected engine type for region {}", region_id),
1094                        }
1095                        .fail();
1096                    }
1097                }
1098            } else {
1099                responses.push((
1100                    region_id,
1101                    Err(BoxedError::new(
1102                        error::RegionNotFoundSnafu { region_id }.build(),
1103                    )),
1104                ));
1105            }
1106        }
1107
1108        for (engine, requests) in engine_grouped_requests {
1109            let engine = self
1110                .engines
1111                .read()
1112                .unwrap()
1113                .get(&engine)
1114                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1115                .clone();
1116            responses.extend(
1117                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1118                    .await?,
1119            );
1120        }
1121
1122        Ok(responses)
1123    }
1124
1125    // Handle requests in batch.
1126    //
1127    // limitation: all create requests must be in the same engine.
1128    pub async fn handle_batch_request(
1129        &self,
1130        batch_request: BatchRegionDdlRequest,
1131    ) -> Result<RegionResponse> {
1132        let region_changes = match &batch_request {
1133            BatchRegionDdlRequest::Create(requests) => requests
1134                .iter()
1135                .map(|(region_id, create)| {
1136                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1137                    Ok((*region_id, RegionChange::Register(attribute)))
1138                })
1139                .collect::<Result<Vec<_>>>()?,
1140            BatchRegionDdlRequest::Drop(requests) => requests
1141                .iter()
1142                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1143                .collect::<Vec<_>>(),
1144            BatchRegionDdlRequest::Alter(requests) => requests
1145                .iter()
1146                .map(|(region_id, _)| (*region_id, RegionChange::None))
1147                .collect::<Vec<_>>(),
1148        };
1149
1150        // The ddl procedure will ensure all requests are in the same engine.
1151        // Therefore, we can get the engine from the first request.
1152        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1153        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1154            CurrentEngine::Engine(engine) => engine,
1155            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1156        };
1157
1158        for (region_id, region_change) in region_changes.iter() {
1159            self.set_region_status_not_ready(*region_id, &engine, region_change);
1160        }
1161
1162        let ddl_type = batch_request.request_type();
1163        let result = engine
1164            .handle_batch_ddl_requests(batch_request)
1165            .await
1166            .context(HandleBatchDdlRequestSnafu { ddl_type });
1167
1168        match result {
1169            Ok(result) => {
1170                for (region_id, region_change) in &region_changes {
1171                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1172                        .await?;
1173                }
1174
1175                Ok(RegionResponse {
1176                    affected_rows: result.affected_rows,
1177                    extensions: result.extensions,
1178                    metadata: Vec::new(),
1179                })
1180            }
1181            Err(err) => {
1182                for (region_id, region_change) in region_changes {
1183                    self.unset_region_status(region_id, &engine, region_change);
1184                }
1185
1186                Err(err)
1187            }
1188        }
1189    }
1190
1191    pub async fn handle_request(
1192        &self,
1193        region_id: RegionId,
1194        request: RegionRequest,
1195    ) -> Result<RegionResponse> {
1196        let request_type = request.request_type();
1197        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1198            .with_label_values(&[request_type])
1199            .start_timer();
1200
1201        let region_change = match &request {
1202            RegionRequest::Create(create) => {
1203                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1204                RegionChange::Register(attribute)
1205            }
1206            RegionRequest::Open(open) => {
1207                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1208                RegionChange::Register(attribute)
1209            }
1210            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1211            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1212                RegionChange::Ingest
1213            }
1214            RegionRequest::Alter(_)
1215            | RegionRequest::Flush(_)
1216            | RegionRequest::Compact(_)
1217            | RegionRequest::Truncate(_)
1218            | RegionRequest::BuildIndex(_)
1219            | RegionRequest::EnterStaging(_) => RegionChange::None,
1220            RegionRequest::Catchup(_) => RegionChange::Catchup,
1221        };
1222
1223        let engine = match self.get_engine(region_id, &region_change)? {
1224            CurrentEngine::Engine(engine) => engine,
1225            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1226        };
1227
1228        // Sets corresponding region status to registering/deregistering before the operation.
1229        self.set_region_status_not_ready(region_id, &engine, &region_change);
1230
1231        match engine
1232            .handle_request(region_id, request)
1233            .await
1234            .with_context(|_| HandleRegionRequestSnafu { region_id })
1235        {
1236            Ok(result) => {
1237                // Update metrics
1238                if matches!(region_change, RegionChange::Ingest) {
1239                    crate::metrics::REGION_CHANGED_ROW_COUNT
1240                        .with_label_values(&[request_type])
1241                        .inc_by(result.affected_rows as u64);
1242                }
1243                // Sets corresponding region status to ready.
1244                self.set_region_status_ready(region_id, engine.clone(), region_change)
1245                    .await?;
1246
1247                Ok(RegionResponse {
1248                    affected_rows: result.affected_rows,
1249                    extensions: result.extensions,
1250                    metadata: Vec::new(),
1251                })
1252            }
1253            Err(err) => {
1254                if matches!(region_change, RegionChange::Ingest) {
1255                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1256                        .with_label_values(&[request_type])
1257                        .inc();
1258                }
1259                // Removes the region status if the operation fails.
1260                self.unset_region_status(region_id, &engine, region_change);
1261                Err(err)
1262            }
1263        }
1264    }
1265
1266    /// Handles the sync region request.
1267    pub async fn handle_sync_region(
1268        &self,
1269        engine: &RegionEngineRef,
1270        region_id: RegionId,
1271        manifest_info: RegionManifestInfo,
1272    ) -> Result<()> {
1273        let Some(new_opened_regions) = engine
1274            .sync_region(region_id, manifest_info)
1275            .await
1276            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1277            .new_opened_logical_region_ids()
1278        else {
1279            return Ok(());
1280        };
1281
1282        for region in new_opened_regions {
1283            self.region_map
1284                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1285            info!("Logical region {} is registered!", region);
1286        }
1287
1288        Ok(())
1289    }
1290
1291    fn set_region_status_not_ready(
1292        &self,
1293        region_id: RegionId,
1294        engine: &RegionEngineRef,
1295        region_change: &RegionChange,
1296    ) {
1297        match region_change {
1298            RegionChange::Register(_) => {
1299                self.region_map.insert(
1300                    region_id,
1301                    RegionEngineWithStatus::Registering(engine.clone()),
1302                );
1303            }
1304            RegionChange::Deregisters => {
1305                self.region_map.insert(
1306                    region_id,
1307                    RegionEngineWithStatus::Deregistering(engine.clone()),
1308                );
1309            }
1310            _ => {}
1311        }
1312    }
1313
1314    fn unset_region_status(
1315        &self,
1316        region_id: RegionId,
1317        engine: &RegionEngineRef,
1318        region_change: RegionChange,
1319    ) {
1320        match region_change {
1321            RegionChange::None | RegionChange::Ingest => {}
1322            RegionChange::Register(_) => {
1323                self.region_map.remove(&region_id);
1324            }
1325            RegionChange::Deregisters => {
1326                self.region_map
1327                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1328            }
1329            RegionChange::Catchup => {}
1330        }
1331    }
1332
1333    async fn set_region_status_ready(
1334        &self,
1335        region_id: RegionId,
1336        engine: RegionEngineRef,
1337        region_change: RegionChange,
1338    ) -> Result<()> {
1339        let engine_type = engine.name();
1340        match region_change {
1341            RegionChange::None | RegionChange::Ingest => {}
1342            RegionChange::Register(attribute) => {
1343                info!(
1344                    "Region {region_id} is registered to engine {}",
1345                    attribute.engine()
1346                );
1347                self.region_map
1348                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1349
1350                match attribute {
1351                    RegionAttribute::Metric { physical } => {
1352                        if physical {
1353                            // Registers the logical regions belong to the physical region (`region_id`).
1354                            self.register_logical_regions(&engine, region_id).await?;
1355                            // We only send the `on_region_registered` event of the physical region.
1356                            self.event_listener.on_region_registered(region_id);
1357                        }
1358                    }
1359                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1360                    RegionAttribute::File => {
1361                        // do nothing
1362                    }
1363                }
1364            }
1365            RegionChange::Deregisters => {
1366                info!("Region {region_id} is deregistered from engine {engine_type}");
1367                self.region_map
1368                    .remove(&region_id)
1369                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1370                self.event_listener.on_region_deregistered(region_id);
1371            }
1372            RegionChange::Catchup => {
1373                if is_metric_engine(engine.name()) {
1374                    // Registers the logical regions belong to the physical region (`region_id`).
1375                    self.register_logical_regions(&engine, region_id).await?;
1376                }
1377            }
1378        }
1379        Ok(())
1380    }
1381
1382    async fn register_logical_regions(
1383        &self,
1384        engine: &RegionEngineRef,
1385        physical_region_id: RegionId,
1386    ) -> Result<()> {
1387        let metric_engine =
1388            engine
1389                .as_any()
1390                .downcast_ref::<MetricEngine>()
1391                .context(UnexpectedSnafu {
1392                    violated: format!(
1393                        "expecting engine type '{}', actual '{}'",
1394                        METRIC_ENGINE_NAME,
1395                        engine.name(),
1396                    ),
1397                })?;
1398
1399        let logical_regions = metric_engine
1400            .logical_regions(physical_region_id)
1401            .await
1402            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1403
1404        for region in logical_regions {
1405            self.region_map
1406                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1407            info!("Logical region {} is registered!", region);
1408        }
1409        Ok(())
1410    }
1411
1412    pub async fn handle_read(
1413        &self,
1414        request: QueryRequest,
1415        query_ctx: QueryContextRef,
1416    ) -> Result<SendableRecordBatchStream> {
1417        // TODO(ruihang): add metrics and set trace id
1418
1419        let result = self
1420            .query_engine
1421            .execute(request.plan, query_ctx)
1422            .await
1423            .context(ExecuteLogicalPlanSnafu)?;
1424
1425        match result.data {
1426            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1427                UnsupportedOutputSnafu { expected: "stream" }.fail()
1428            }
1429            OutputData::Stream(stream) => Ok(stream),
1430        }
1431    }
1432
1433    async fn stop(&self) -> Result<()> {
1434        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1435        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1436        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1437        //
1438        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1439        // it here, collect the values first then use later separately.
1440
1441        let regions = self
1442            .region_map
1443            .iter()
1444            .map(|x| (*x.key(), x.value().clone()))
1445            .collect::<Vec<_>>();
1446        let num_regions = regions.len();
1447
1448        for (region_id, engine) in regions {
1449            let closed = engine
1450                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1451                .await;
1452            match closed {
1453                Ok(_) => debug!("Region {region_id} is closed"),
1454                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1455            }
1456        }
1457        self.region_map.clear();
1458        info!("closed {num_regions} regions");
1459
1460        drop(self.mito_engine.write().unwrap().take());
1461        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1462        for (engine_name, engine) in engines {
1463            engine
1464                .stop()
1465                .await
1466                .context(StopRegionEngineSnafu { name: &engine_name })?;
1467            info!("Region engine {engine_name} is stopped");
1468        }
1469
1470        Ok(())
1471    }
1472}
1473
1474#[derive(Debug, Clone, Copy)]
1475enum RegionChange {
1476    None,
1477    Register(RegionAttribute),
1478    Deregisters,
1479    Catchup,
1480    Ingest,
1481}
1482
1483fn is_metric_engine(engine: &str) -> bool {
1484    engine == METRIC_ENGINE_NAME
1485}
1486
1487fn parse_region_attribute(
1488    engine: &str,
1489    options: &HashMap<String, String>,
1490) -> Result<RegionAttribute> {
1491    match engine {
1492        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1493        METRIC_ENGINE_NAME => {
1494            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1495
1496            Ok(RegionAttribute::Metric { physical })
1497        }
1498        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1499        _ => error::UnexpectedSnafu {
1500            violated: format!("Unknown engine: {}", engine),
1501        }
1502        .fail(),
1503    }
1504}
1505
1506#[derive(Debug, Clone, Copy)]
1507enum RegionAttribute {
1508    Mito,
1509    Metric { physical: bool },
1510    File,
1511}
1512
1513impl RegionAttribute {
1514    fn engine(&self) -> &'static str {
1515        match self {
1516            RegionAttribute::Mito => MITO_ENGINE_NAME,
1517            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1518            RegionAttribute::File => FILE_ENGINE_NAME,
1519        }
1520    }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525
1526    use std::assert_matches::assert_matches;
1527
1528    use api::v1::SemanticType;
1529    use common_error::ext::ErrorExt;
1530    use datatypes::prelude::ConcreteDataType;
1531    use mito2::test_util::CreateRequestBuilder;
1532    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1533    use store_api::region_engine::RegionEngine;
1534    use store_api::region_request::{
1535        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1536    };
1537    use store_api::storage::RegionId;
1538
1539    use super::*;
1540    use crate::error::Result;
1541    use crate::tests::{MockRegionEngine, mock_region_server};
1542
1543    #[tokio::test]
1544    async fn test_region_registering() {
1545        common_telemetry::init_default_ut_logging();
1546
1547        let mut mock_region_server = mock_region_server();
1548        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1549        let engine_name = engine.name();
1550        mock_region_server.register_engine(engine.clone());
1551        let region_id = RegionId::new(1, 1);
1552        let builder = CreateRequestBuilder::new();
1553        let create_req = builder.build();
1554        // Tries to create/open a registering region.
1555        mock_region_server.inner.region_map.insert(
1556            region_id,
1557            RegionEngineWithStatus::Registering(engine.clone()),
1558        );
1559        let response = mock_region_server
1560            .handle_request(region_id, RegionRequest::Create(create_req))
1561            .await
1562            .unwrap();
1563        assert_eq!(response.affected_rows, 0);
1564        let status = mock_region_server
1565            .inner
1566            .region_map
1567            .get(&region_id)
1568            .unwrap()
1569            .clone();
1570        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1571
1572        mock_region_server.inner.region_map.insert(
1573            region_id,
1574            RegionEngineWithStatus::Registering(engine.clone()),
1575        );
1576        let response = mock_region_server
1577            .handle_request(
1578                region_id,
1579                RegionRequest::Open(RegionOpenRequest {
1580                    engine: engine_name.to_string(),
1581                    table_dir: String::new(),
1582                    path_type: PathType::Bare,
1583                    options: Default::default(),
1584                    skip_wal_replay: false,
1585                    checkpoint: None,
1586                }),
1587            )
1588            .await
1589            .unwrap();
1590        assert_eq!(response.affected_rows, 0);
1591        let status = mock_region_server
1592            .inner
1593            .region_map
1594            .get(&region_id)
1595            .unwrap()
1596            .clone();
1597        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1598    }
1599
1600    #[tokio::test]
1601    async fn test_region_deregistering() {
1602        common_telemetry::init_default_ut_logging();
1603
1604        let mut mock_region_server = mock_region_server();
1605        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1606
1607        mock_region_server.register_engine(engine.clone());
1608
1609        let region_id = RegionId::new(1, 1);
1610
1611        // Tries to drop/close a registering region.
1612        mock_region_server.inner.region_map.insert(
1613            region_id,
1614            RegionEngineWithStatus::Deregistering(engine.clone()),
1615        );
1616
1617        let response = mock_region_server
1618            .handle_request(
1619                region_id,
1620                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1621            )
1622            .await
1623            .unwrap();
1624        assert_eq!(response.affected_rows, 0);
1625
1626        let status = mock_region_server
1627            .inner
1628            .region_map
1629            .get(&region_id)
1630            .unwrap()
1631            .clone();
1632        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1633
1634        mock_region_server.inner.region_map.insert(
1635            region_id,
1636            RegionEngineWithStatus::Deregistering(engine.clone()),
1637        );
1638
1639        let response = mock_region_server
1640            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1641            .await
1642            .unwrap();
1643        assert_eq!(response.affected_rows, 0);
1644
1645        let status = mock_region_server
1646            .inner
1647            .region_map
1648            .get(&region_id)
1649            .unwrap()
1650            .clone();
1651        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1652    }
1653
1654    #[tokio::test]
1655    async fn test_region_not_ready() {
1656        common_telemetry::init_default_ut_logging();
1657
1658        let mut mock_region_server = mock_region_server();
1659        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1660
1661        mock_region_server.register_engine(engine.clone());
1662
1663        let region_id = RegionId::new(1, 1);
1664
1665        // Tries to drop/close a registering region.
1666        mock_region_server.inner.region_map.insert(
1667            region_id,
1668            RegionEngineWithStatus::Registering(engine.clone()),
1669        );
1670
1671        let err = mock_region_server
1672            .handle_request(
1673                region_id,
1674                RegionRequest::Truncate(RegionTruncateRequest::All),
1675            )
1676            .await
1677            .unwrap_err();
1678
1679        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1680    }
1681
1682    #[tokio::test]
1683    async fn test_region_request_failed() {
1684        common_telemetry::init_default_ut_logging();
1685
1686        let mut mock_region_server = mock_region_server();
1687        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1688            MITO_ENGINE_NAME,
1689            Box::new(|_region_id, _request| {
1690                error::UnexpectedSnafu {
1691                    violated: "test".to_string(),
1692                }
1693                .fail()
1694            }),
1695        );
1696
1697        mock_region_server.register_engine(engine.clone());
1698
1699        let region_id = RegionId::new(1, 1);
1700        let builder = CreateRequestBuilder::new();
1701        let create_req = builder.build();
1702        mock_region_server
1703            .handle_request(region_id, RegionRequest::Create(create_req))
1704            .await
1705            .unwrap_err();
1706
1707        let status = mock_region_server.inner.region_map.get(&region_id);
1708        assert!(status.is_none());
1709
1710        mock_region_server
1711            .inner
1712            .region_map
1713            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1714
1715        mock_region_server
1716            .handle_request(
1717                region_id,
1718                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1719            )
1720            .await
1721            .unwrap_err();
1722
1723        let status = mock_region_server.inner.region_map.get(&region_id);
1724        assert!(status.is_some());
1725    }
1726
1727    #[tokio::test]
1728    async fn test_batch_open_region_ignore_nonexistent_regions() {
1729        common_telemetry::init_default_ut_logging();
1730        let mut mock_region_server = mock_region_server();
1731        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1732            MITO_ENGINE_NAME,
1733            Box::new(|region_id, _request| {
1734                if region_id == RegionId::new(1, 1) {
1735                    error::RegionNotFoundSnafu { region_id }.fail()
1736                } else {
1737                    Ok(0)
1738                }
1739            }),
1740        );
1741        mock_region_server.register_engine(engine.clone());
1742
1743        let region_ids = mock_region_server
1744            .handle_batch_open_requests(
1745                8,
1746                vec![
1747                    (
1748                        RegionId::new(1, 1),
1749                        RegionOpenRequest {
1750                            engine: MITO_ENGINE_NAME.to_string(),
1751                            table_dir: String::new(),
1752                            path_type: PathType::Bare,
1753                            options: Default::default(),
1754                            skip_wal_replay: false,
1755                            checkpoint: None,
1756                        },
1757                    ),
1758                    (
1759                        RegionId::new(1, 2),
1760                        RegionOpenRequest {
1761                            engine: MITO_ENGINE_NAME.to_string(),
1762                            table_dir: String::new(),
1763                            path_type: PathType::Bare,
1764                            options: Default::default(),
1765                            skip_wal_replay: false,
1766                            checkpoint: None,
1767                        },
1768                    ),
1769                ],
1770                true,
1771            )
1772            .await
1773            .unwrap();
1774        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1775
1776        let err = mock_region_server
1777            .handle_batch_open_requests(
1778                8,
1779                vec![
1780                    (
1781                        RegionId::new(1, 1),
1782                        RegionOpenRequest {
1783                            engine: MITO_ENGINE_NAME.to_string(),
1784                            table_dir: String::new(),
1785                            path_type: PathType::Bare,
1786                            options: Default::default(),
1787                            skip_wal_replay: false,
1788                            checkpoint: None,
1789                        },
1790                    ),
1791                    (
1792                        RegionId::new(1, 2),
1793                        RegionOpenRequest {
1794                            engine: MITO_ENGINE_NAME.to_string(),
1795                            table_dir: String::new(),
1796                            path_type: PathType::Bare,
1797                            options: Default::default(),
1798                            skip_wal_replay: false,
1799                            checkpoint: None,
1800                        },
1801                    ),
1802                ],
1803                false,
1804            )
1805            .await
1806            .unwrap_err();
1807        assert_eq!(err.status_code(), StatusCode::Unexpected);
1808    }
1809
1810    struct CurrentEngineTest {
1811        region_id: RegionId,
1812        current_region_status: Option<RegionEngineWithStatus>,
1813        region_change: RegionChange,
1814        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1815    }
1816
1817    #[tokio::test]
1818    async fn test_current_engine() {
1819        common_telemetry::init_default_ut_logging();
1820
1821        let mut mock_region_server = mock_region_server();
1822        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1823        mock_region_server.register_engine(engine.clone());
1824
1825        let region_id = RegionId::new(1024, 1);
1826        let tests = vec![
1827            // RegionChange::None
1828            CurrentEngineTest {
1829                region_id,
1830                current_region_status: None,
1831                region_change: RegionChange::None,
1832                assert: Box::new(|result| {
1833                    let err = result.unwrap_err();
1834                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1835                }),
1836            },
1837            CurrentEngineTest {
1838                region_id,
1839                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1840                region_change: RegionChange::None,
1841                assert: Box::new(|result| {
1842                    let current_engine = result.unwrap();
1843                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1844                }),
1845            },
1846            CurrentEngineTest {
1847                region_id,
1848                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1849                region_change: RegionChange::None,
1850                assert: Box::new(|result| {
1851                    let err = result.unwrap_err();
1852                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1853                }),
1854            },
1855            CurrentEngineTest {
1856                region_id,
1857                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1858                region_change: RegionChange::None,
1859                assert: Box::new(|result| {
1860                    let err = result.unwrap_err();
1861                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1862                }),
1863            },
1864            // RegionChange::Register
1865            CurrentEngineTest {
1866                region_id,
1867                current_region_status: None,
1868                region_change: RegionChange::Register(RegionAttribute::Mito),
1869                assert: Box::new(|result| {
1870                    let current_engine = result.unwrap();
1871                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1872                }),
1873            },
1874            CurrentEngineTest {
1875                region_id,
1876                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1877                region_change: RegionChange::Register(RegionAttribute::Mito),
1878                assert: Box::new(|result| {
1879                    let current_engine = result.unwrap();
1880                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1881                }),
1882            },
1883            CurrentEngineTest {
1884                region_id,
1885                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1886                region_change: RegionChange::Register(RegionAttribute::Mito),
1887                assert: Box::new(|result| {
1888                    let err = result.unwrap_err();
1889                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1890                }),
1891            },
1892            CurrentEngineTest {
1893                region_id,
1894                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1895                region_change: RegionChange::Register(RegionAttribute::Mito),
1896                assert: Box::new(|result| {
1897                    let current_engine = result.unwrap();
1898                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1899                }),
1900            },
1901            // RegionChange::Deregister
1902            CurrentEngineTest {
1903                region_id,
1904                current_region_status: None,
1905                region_change: RegionChange::Deregisters,
1906                assert: Box::new(|result| {
1907                    let current_engine = result.unwrap();
1908                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1909                }),
1910            },
1911            CurrentEngineTest {
1912                region_id,
1913                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1914                region_change: RegionChange::Deregisters,
1915                assert: Box::new(|result| {
1916                    let err = result.unwrap_err();
1917                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1918                }),
1919            },
1920            CurrentEngineTest {
1921                region_id,
1922                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1923                region_change: RegionChange::Deregisters,
1924                assert: Box::new(|result| {
1925                    let current_engine = result.unwrap();
1926                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1927                }),
1928            },
1929            CurrentEngineTest {
1930                region_id,
1931                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1932                region_change: RegionChange::Deregisters,
1933                assert: Box::new(|result| {
1934                    let current_engine = result.unwrap();
1935                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1936                }),
1937            },
1938        ];
1939
1940        for test in tests {
1941            let CurrentEngineTest {
1942                region_id,
1943                current_region_status,
1944                region_change,
1945                assert,
1946            } = test;
1947
1948            // Sets up
1949            if let Some(status) = current_region_status {
1950                mock_region_server
1951                    .inner
1952                    .region_map
1953                    .insert(region_id, status);
1954            } else {
1955                mock_region_server.inner.region_map.remove(&region_id);
1956            }
1957
1958            let result = mock_region_server
1959                .inner
1960                .get_engine(region_id, &region_change);
1961
1962            assert(result);
1963        }
1964    }
1965
1966    #[tokio::test]
1967    async fn test_region_server_parallelism() {
1968        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1969        let first_query = p.acquire().await;
1970        assert!(first_query.is_ok());
1971        let second_query = p.acquire().await;
1972        assert!(second_query.is_ok());
1973        let third_query = p.acquire().await;
1974        assert!(third_query.is_err());
1975        let err = third_query.unwrap_err();
1976        assert_eq!(
1977            err.output_msg(),
1978            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1979        );
1980        drop(first_query);
1981        let forth_query = p.acquire().await;
1982        assert!(forth_query.is_ok());
1983    }
1984
1985    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1986        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1987        metadata_builder.push_column_metadata(ColumnMetadata {
1988            column_schema: datatypes::schema::ColumnSchema::new(
1989                "timestamp",
1990                ConcreteDataType::timestamp_nanosecond_datatype(),
1991                false,
1992            ),
1993            semantic_type: SemanticType::Timestamp,
1994            column_id: 0,
1995        });
1996        metadata_builder.push_column_metadata(ColumnMetadata {
1997            column_schema: datatypes::schema::ColumnSchema::new(
1998                "file",
1999                ConcreteDataType::string_datatype(),
2000                true,
2001            ),
2002            semantic_type: SemanticType::Tag,
2003            column_id: 1,
2004        });
2005        metadata_builder.push_column_metadata(ColumnMetadata {
2006            column_schema: datatypes::schema::ColumnSchema::new(
2007                "message",
2008                ConcreteDataType::string_datatype(),
2009                true,
2010            ),
2011            semantic_type: SemanticType::Field,
2012            column_id: 2,
2013        });
2014        metadata_builder.primary_key(vec![1]);
2015        metadata_builder.build().unwrap()
2016    }
2017
2018    #[tokio::test]
2019    async fn test_handle_list_metadata_request() {
2020        common_telemetry::init_default_ut_logging();
2021
2022        let mut mock_region_server = mock_region_server();
2023        let region_id_1 = RegionId::new(1, 0);
2024        let region_id_2 = RegionId::new(2, 0);
2025
2026        let metadata_1 = mock_region_metadata(region_id_1);
2027        let metadata_2 = mock_region_metadata(region_id_2);
2028        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2029
2030        let metadata_1 = Arc::new(metadata_1);
2031        let metadata_2 = Arc::new(metadata_2);
2032        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2033            MITO_ENGINE_NAME,
2034            Box::new(move |region_id| {
2035                if region_id == region_id_1 {
2036                    Ok(metadata_1.clone())
2037                } else if region_id == region_id_2 {
2038                    Ok(metadata_2.clone())
2039                } else {
2040                    error::RegionNotFoundSnafu { region_id }.fail()
2041                }
2042            }),
2043        );
2044
2045        mock_region_server.register_engine(engine.clone());
2046        mock_region_server
2047            .inner
2048            .region_map
2049            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2050        mock_region_server
2051            .inner
2052            .region_map
2053            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2054
2055        // All regions exist.
2056        let list_metadata_request = ListMetadataRequest {
2057            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2058        };
2059        let response = mock_region_server
2060            .handle_list_metadata_request(&list_metadata_request)
2061            .await
2062            .unwrap();
2063        let decoded_metadata: Vec<Option<RegionMetadata>> =
2064            serde_json::from_slice(&response.metadata).unwrap();
2065        assert_eq!(metadatas, decoded_metadata);
2066    }
2067
2068    #[tokio::test]
2069    async fn test_handle_list_metadata_not_found() {
2070        common_telemetry::init_default_ut_logging();
2071
2072        let mut mock_region_server = mock_region_server();
2073        let region_id_1 = RegionId::new(1, 0);
2074        let region_id_2 = RegionId::new(2, 0);
2075
2076        let metadata_1 = mock_region_metadata(region_id_1);
2077        let metadatas = vec![Some(metadata_1.clone()), None];
2078
2079        let metadata_1 = Arc::new(metadata_1);
2080        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2081            MITO_ENGINE_NAME,
2082            Box::new(move |region_id| {
2083                if region_id == region_id_1 {
2084                    Ok(metadata_1.clone())
2085                } else {
2086                    error::RegionNotFoundSnafu { region_id }.fail()
2087                }
2088            }),
2089        );
2090
2091        mock_region_server.register_engine(engine.clone());
2092        mock_region_server
2093            .inner
2094            .region_map
2095            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2096
2097        // Not in region map.
2098        let list_metadata_request = ListMetadataRequest {
2099            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2100        };
2101        let response = mock_region_server
2102            .handle_list_metadata_request(&list_metadata_request)
2103            .await
2104            .unwrap();
2105        let decoded_metadata: Vec<Option<RegionMetadata>> =
2106            serde_json::from_slice(&response.metadata).unwrap();
2107        assert_eq!(metadatas, decoded_metadata);
2108
2109        // Not in region engine.
2110        mock_region_server
2111            .inner
2112            .region_map
2113            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2114        let response = mock_region_server
2115            .handle_list_metadata_request(&list_metadata_request)
2116            .await
2117            .unwrap();
2118        let decoded_metadata: Vec<Option<RegionMetadata>> =
2119            serde_json::from_slice(&response.metadata).unwrap();
2120        assert_eq!(metadatas, decoded_metadata);
2121    }
2122
2123    #[tokio::test]
2124    async fn test_handle_list_metadata_failed() {
2125        common_telemetry::init_default_ut_logging();
2126
2127        let mut mock_region_server = mock_region_server();
2128        let region_id_1 = RegionId::new(1, 0);
2129
2130        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2131            MITO_ENGINE_NAME,
2132            Box::new(move |region_id| {
2133                error::UnexpectedSnafu {
2134                    violated: format!("Failed to get region {region_id}"),
2135                }
2136                .fail()
2137            }),
2138        );
2139
2140        mock_region_server.register_engine(engine.clone());
2141        mock_region_server
2142            .inner
2143            .region_map
2144            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2145
2146        // Failed to get.
2147        let list_metadata_request = ListMetadataRequest {
2148            region_ids: vec![region_id_1.as_u64()],
2149        };
2150        mock_region_server
2151            .handle_list_metadata_request(&list_metadata_request)
2152            .await
2153            .unwrap_err();
2154    }
2155}