datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::{Arc, RwLock};
21use std::time::Duration;
22
23use api::region::RegionResponse;
24use api::v1::meta::TopicStat;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
28};
29use api::v1::{ResponseHeader, Status};
30use arrow_flight::{FlightData, Ticket};
31use async_trait::async_trait;
32use bytes::Bytes;
33use common_error::ext::{BoxedError, ErrorExt};
34use common_error::status_code::StatusCode;
35use common_meta::datanode::TopicStatsReporter;
36use common_query::OutputData;
37use common_query::request::QueryRequest;
38use common_recordbatch::SendableRecordBatchStream;
39use common_runtime::Runtime;
40use common_telemetry::tracing::{self, info_span};
41use common_telemetry::tracing_context::{FutureExt, TracingContext};
42use common_telemetry::{debug, error, info, warn};
43use dashmap::DashMap;
44use datafusion::datasource::TableProvider;
45use datafusion_common::tree_node::TreeNode;
46use futures_util::future::try_join_all;
47use metric_engine::engine::MetricEngine;
48use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
49use prost::Message;
50use query::QueryEngineRef;
51pub use query::dummy_catalog::{
52    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
53};
54use serde_json;
55use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
56use servers::grpc::FlightCompression;
57use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
58use servers::grpc::region_server::RegionServerHandler;
59use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
60use snafu::{OptionExt, ResultExt, ensure};
61use store_api::metric_engine_consts::{
62    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
63};
64use store_api::region_engine::{
65    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
66    SettableRegionRoleState,
67};
68use store_api::region_request::{
69    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
70    RegionOpenRequest, RegionRequest,
71};
72use store_api::storage::RegionId;
73use tokio::sync::{Semaphore, SemaphorePermit};
74use tokio::time::timeout;
75use tonic::{Request, Response, Result as TonicResult};
76
77use crate::error::{
78    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
79    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
80    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
81    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
82    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
83    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
84};
85use crate::event_listener::RegionServerEventListenerRef;
86use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
87
88#[derive(Clone)]
89pub struct RegionServer {
90    inner: Arc<RegionServerInner>,
91    flight_compression: FlightCompression,
92}
93
94pub struct RegionStat {
95    pub region_id: RegionId,
96    pub engine: String,
97    pub role: RegionRole,
98}
99
100impl RegionServer {
101    pub fn new(
102        query_engine: QueryEngineRef,
103        runtime: Runtime,
104        event_listener: RegionServerEventListenerRef,
105        flight_compression: FlightCompression,
106    ) -> Self {
107        Self::with_table_provider(
108            query_engine,
109            runtime,
110            event_listener,
111            Arc::new(DummyTableProviderFactory),
112            0,
113            Duration::from_millis(0),
114            flight_compression,
115        )
116    }
117
118    pub fn with_table_provider(
119        query_engine: QueryEngineRef,
120        runtime: Runtime,
121        event_listener: RegionServerEventListenerRef,
122        table_provider_factory: TableProviderFactoryRef,
123        max_concurrent_queries: usize,
124        concurrent_query_limiter_timeout: Duration,
125        flight_compression: FlightCompression,
126    ) -> Self {
127        Self {
128            inner: Arc::new(RegionServerInner::new(
129                query_engine,
130                runtime,
131                event_listener,
132                table_provider_factory,
133                RegionServerParallelism::from_opts(
134                    max_concurrent_queries,
135                    concurrent_query_limiter_timeout,
136                ),
137            )),
138            flight_compression,
139        }
140    }
141
142    /// Registers an engine.
143    pub fn register_engine(&mut self, engine: RegionEngineRef) {
144        self.inner.register_engine(engine);
145    }
146
147    /// Sets the topic stats.
148    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
149        self.inner.set_topic_stats_reporter(topic_stats_reporter);
150    }
151
152    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
153    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
154        match self.inner.get_engine(region_id, &RegionChange::None) {
155            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
156            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
157            Err(error::Error::RegionNotFound { .. }) => Ok(None),
158            Err(err) => Err(err),
159        }
160    }
161
162    /// Gets the MitoEngine if it's registered.
163    pub fn mito_engine(&self) -> Option<MitoEngine> {
164        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
165            Some(mito)
166        } else {
167            self.inner
168                .engines
169                .read()
170                .unwrap()
171                .get(MITO_ENGINE_NAME)
172                .cloned()
173                .and_then(|e| {
174                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
175                    if mito.is_none() {
176                        warn!("Mito engine not found in region server engines");
177                    }
178                    mito
179                })
180        }
181    }
182
183    #[tracing::instrument(skip_all)]
184    pub async fn handle_batch_open_requests(
185        &self,
186        parallelism: usize,
187        requests: Vec<(RegionId, RegionOpenRequest)>,
188        ignore_nonexistent_region: bool,
189    ) -> Result<Vec<RegionId>> {
190        self.inner
191            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
192            .await
193    }
194
195    #[tracing::instrument(skip_all)]
196    pub async fn handle_batch_catchup_requests(
197        &self,
198        parallelism: usize,
199        requests: Vec<(RegionId, RegionCatchupRequest)>,
200    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
201        self.inner
202            .handle_batch_catchup_requests(parallelism, requests)
203            .await
204    }
205
206    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
207    pub async fn handle_request(
208        &self,
209        region_id: RegionId,
210        request: RegionRequest,
211    ) -> Result<RegionResponse> {
212        self.inner.handle_request(region_id, request).await
213    }
214
215    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
216    async fn table_provider(
217        &self,
218        region_id: RegionId,
219        ctx: Option<QueryContextRef>,
220    ) -> Result<Arc<dyn TableProvider>> {
221        let status = self
222            .inner
223            .region_map
224            .get(&region_id)
225            .context(RegionNotFoundSnafu { region_id })?
226            .clone();
227        ensure!(
228            matches!(status, RegionEngineWithStatus::Ready(_)),
229            RegionNotReadySnafu { region_id }
230        );
231
232        self.inner
233            .table_provider_factory
234            .create(region_id, status.into_engine(), ctx)
235            .await
236            .context(ExecuteLogicalPlanSnafu)
237    }
238
239    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
240    pub async fn handle_remote_read(
241        &self,
242        request: api::v1::region::QueryRequest,
243        query_ctx: QueryContextRef,
244    ) -> Result<SendableRecordBatchStream> {
245        let _permit = if let Some(p) = &self.inner.parallelism {
246            Some(p.acquire().await?)
247        } else {
248            None
249        };
250
251        let region_id = RegionId::from_u64(request.region_id);
252        let catalog_list = Arc::new(NameAwareCatalogList::new(
253            self.clone(),
254            region_id,
255            query_ctx.clone(),
256        ));
257
258        if query_ctx.explain_verbose() {
259            common_telemetry::info!("Handle remote read for region: {}", region_id);
260        }
261
262        let decoder = self
263            .inner
264            .query_engine
265            .engine_context(query_ctx.clone())
266            .new_plan_decoder()
267            .context(NewPlanDecoderSnafu)?;
268
269        let plan = decoder
270            .decode(Bytes::from(request.plan), catalog_list, false)
271            .await
272            .context(DecodeLogicalPlanSnafu)?;
273
274        self.inner
275            .handle_read(
276                QueryRequest {
277                    header: request.header,
278                    region_id,
279                    plan,
280                },
281                query_ctx,
282            )
283            .await
284    }
285
286    #[tracing::instrument(skip_all)]
287    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
288        let _permit = if let Some(p) = &self.inner.parallelism {
289            Some(p.acquire().await?)
290        } else {
291            None
292        };
293
294        let ctx = request.header.as_ref().map(|h| h.into());
295        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
296
297        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
298            .context(DataFusionSnafu)?;
299        let mut injector = injector_builder
300            .build(self, request.region_id, query_ctx.clone())
301            .await?;
302
303        let plan = request
304            .plan
305            .rewrite(&mut injector)
306            .context(DataFusionSnafu)?
307            .data;
308
309        self.inner
310            .handle_read(QueryRequest { plan, ..request }, query_ctx)
311            .await
312    }
313
314    /// Returns all opened and reportable regions.
315    ///
316    /// Notes: except all metrics regions.
317    pub fn reportable_regions(&self) -> Vec<RegionStat> {
318        self.inner
319            .region_map
320            .iter()
321            .filter_map(|e| {
322                let region_id = *e.key();
323                // Filters out any regions whose role equals None.
324                e.role(region_id).map(|role| RegionStat {
325                    region_id,
326                    engine: e.value().name().to_string(),
327                    role,
328                })
329            })
330            .collect()
331    }
332
333    /// Returns the reportable topics.
334    pub fn topic_stats(&self) -> Vec<TopicStat> {
335        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
336        let Some(reporter) = reporter.as_mut() else {
337            return vec![];
338        };
339        reporter
340            .reportable_topics()
341            .into_iter()
342            .map(|stat| TopicStat {
343                topic_name: stat.topic,
344                record_size: stat.record_size,
345                record_num: stat.record_num,
346                latest_entry_id: stat.latest_entry_id,
347            })
348            .collect()
349    }
350
351    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
352        self.inner.region_map.get(&region_id).and_then(|engine| {
353            engine.role(region_id).map(|role| match role {
354                RegionRole::Follower => false,
355                RegionRole::Leader => true,
356                RegionRole::DowngradingLeader => true,
357            })
358        })
359    }
360
361    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
362        let engine = self
363            .inner
364            .region_map
365            .get(&region_id)
366            .with_context(|| RegionNotFoundSnafu { region_id })?;
367        engine
368            .set_region_role(region_id, role)
369            .with_context(|_| HandleRegionRequestSnafu { region_id })
370    }
371
372    /// Set region role state gracefully.
373    ///
374    /// For [SettableRegionRoleState::Follower]:
375    /// After the call returns, the engine ensures that
376    /// no **further** write or flush operations will succeed in this region.
377    ///
378    /// For [SettableRegionRoleState::DowngradingLeader]:
379    /// After the call returns, the engine ensures that
380    /// no **further** write operations will succeed in this region.
381    pub async fn set_region_role_state_gracefully(
382        &self,
383        region_id: RegionId,
384        state: SettableRegionRoleState,
385    ) -> Result<SetRegionRoleStateResponse> {
386        match self.inner.region_map.get(&region_id) {
387            Some(engine) => Ok(engine
388                .set_region_role_state_gracefully(region_id, state)
389                .await
390                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
391            None => Ok(SetRegionRoleStateResponse::NotFound),
392        }
393    }
394
395    pub fn runtime(&self) -> Runtime {
396        self.inner.runtime.clone()
397    }
398
399    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
400        match self.inner.region_map.get(&region_id) {
401            Some(e) => e.region_statistic(region_id),
402            None => None,
403        }
404    }
405
406    /// Stop the region server.
407    pub async fn stop(&self) -> Result<()> {
408        self.inner.stop().await
409    }
410
411    #[cfg(test)]
412    /// Registers a region for test purpose.
413    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
414        {
415            let mut engines = self.inner.engines.write().unwrap();
416            if !engines.contains_key(engine.name()) {
417                debug!("Registering test engine: {}", engine.name());
418                engines.insert(engine.name().to_string(), engine.clone());
419            }
420        }
421
422        self.inner
423            .region_map
424            .insert(region_id, RegionEngineWithStatus::Ready(engine));
425    }
426
427    async fn handle_batch_ddl_requests(
428        &self,
429        request: region_request::Body,
430    ) -> Result<RegionResponse> {
431        // Safety: we have already checked the request type in `RegionServer::handle()`.
432        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
433            .context(BuildRegionRequestsSnafu)?
434            .unwrap();
435        let tracing_context = TracingContext::from_current_span();
436
437        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
438        self.inner
439            .handle_batch_request(batch_request)
440            .trace(span)
441            .await
442    }
443
444    async fn handle_requests_in_parallel(
445        &self,
446        request: region_request::Body,
447    ) -> Result<RegionResponse> {
448        let requests =
449            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
450        let tracing_context = TracingContext::from_current_span();
451
452        let join_tasks = requests.into_iter().map(|(region_id, req)| {
453            let self_to_move = self;
454            let span = tracing_context.attach(info_span!(
455                "RegionServer::handle_region_request",
456                region_id = region_id.to_string()
457            ));
458            async move {
459                self_to_move
460                    .handle_request(region_id, req)
461                    .trace(span)
462                    .await
463            }
464        });
465
466        let results = try_join_all(join_tasks).await?;
467        let mut affected_rows = 0;
468        let mut extensions = HashMap::new();
469        for result in results {
470            affected_rows += result.affected_rows;
471            extensions.extend(result.extensions);
472        }
473
474        Ok(RegionResponse {
475            affected_rows,
476            extensions,
477            metadata: Vec::new(),
478        })
479    }
480
481    async fn handle_requests_in_serial(
482        &self,
483        request: region_request::Body,
484    ) -> Result<RegionResponse> {
485        let requests =
486            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
487        let tracing_context = TracingContext::from_current_span();
488
489        let mut affected_rows = 0;
490        let mut extensions = HashMap::new();
491        // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
492        // modify this part to avoid inefficient serial loop calls.
493        for (region_id, req) in requests {
494            let span = tracing_context.attach(info_span!(
495                "RegionServer::handle_region_request",
496                region_id = region_id.to_string()
497            ));
498            let result = self.handle_request(region_id, req).trace(span).await?;
499
500            affected_rows += result.affected_rows;
501            extensions.extend(result.extensions);
502        }
503
504        Ok(RegionResponse {
505            affected_rows,
506            extensions,
507            metadata: Vec::new(),
508        })
509    }
510
511    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
512        let region_id = RegionId::from_u64(request.region_id);
513        let manifest_info = request
514            .manifest_info
515            .context(error::MissingRequiredFieldSnafu {
516                name: "manifest_info",
517            })?;
518
519        let manifest_info = match manifest_info {
520            ManifestInfo::MitoManifestInfo(info) => {
521                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
522            }
523            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
524                info.data_manifest_version,
525                0,
526                info.metadata_manifest_version,
527                0,
528            ),
529        };
530
531        let tracing_context = TracingContext::from_current_span();
532        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
533
534        self.sync_region(region_id, manifest_info)
535            .trace(span)
536            .await
537            .map(|_| RegionResponse::new(AffectedRows::default()))
538    }
539
540    /// Handles the ListMetadata request and retrieves metadata for specified regions.
541    ///
542    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
543    /// non-existing regions as `null`.
544    #[tracing::instrument(skip_all)]
545    async fn handle_list_metadata_request(
546        &self,
547        request: &ListMetadataRequest,
548    ) -> Result<RegionResponse> {
549        let mut region_metadatas = Vec::new();
550        // Collect metadata for each region
551        for region_id in &request.region_ids {
552            let region_id = RegionId::from_u64(*region_id);
553            // Get the engine.
554            let Some(engine) = self.find_engine(region_id)? else {
555                region_metadatas.push(None);
556                continue;
557            };
558
559            match engine.get_metadata(region_id).await {
560                Ok(metadata) => region_metadatas.push(Some(metadata)),
561                Err(err) => {
562                    if err.status_code() == StatusCode::RegionNotFound {
563                        region_metadatas.push(None);
564                    } else {
565                        Err(err).with_context(|_| GetRegionMetadataSnafu {
566                            engine: engine.name(),
567                            region_id,
568                        })?;
569                    }
570                }
571            }
572        }
573
574        // Serialize metadata to JSON
575        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
576
577        let response = RegionResponse::from_metadata(json_result);
578
579        Ok(response)
580    }
581
582    /// Sync region manifest and registers new opened logical regions.
583    pub async fn sync_region(
584        &self,
585        region_id: RegionId,
586        manifest_info: RegionManifestInfo,
587    ) -> Result<()> {
588        let engine_with_status = self
589            .inner
590            .region_map
591            .get(&region_id)
592            .with_context(|| RegionNotFoundSnafu { region_id })?;
593
594        self.inner
595            .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
596            .await
597    }
598}
599
600#[async_trait]
601impl RegionServerHandler for RegionServer {
602    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
603        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
604            .with_label_values(&[request.as_ref()]);
605        let response = match &request {
606            region_request::Body::Creates(_)
607            | region_request::Body::Drops(_)
608            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
609            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
610                self.handle_requests_in_parallel(request).await
611            }
612            region_request::Body::Sync(sync_request) => {
613                self.handle_sync_region_request(sync_request).await
614            }
615            region_request::Body::ListMetadata(list_metadata_request) => {
616                self.handle_list_metadata_request(list_metadata_request)
617                    .await
618            }
619            _ => self.handle_requests_in_serial(request).await,
620        }
621        .map_err(BoxedError::new)
622        .inspect_err(|_| {
623            failed_requests_cnt.inc();
624        })
625        .context(ExecuteGrpcRequestSnafu)?;
626
627        Ok(RegionResponseV1 {
628            header: Some(ResponseHeader {
629                status: Some(Status {
630                    status_code: StatusCode::Success as _,
631                    ..Default::default()
632                }),
633            }),
634            affected_rows: response.affected_rows as _,
635            extensions: response.extensions,
636            metadata: response.metadata,
637        })
638    }
639}
640
641#[async_trait]
642impl FlightCraft for RegionServer {
643    async fn do_get(
644        &self,
645        request: Request<Ticket>,
646    ) -> TonicResult<Response<TonicStream<FlightData>>> {
647        let ticket = request.into_inner().ticket;
648        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
649            .context(servers_error::InvalidFlightTicketSnafu)?;
650        let tracing_context = request
651            .header
652            .as_ref()
653            .map(|h| TracingContext::from_w3c(&h.tracing_context))
654            .unwrap_or_default();
655        let query_ctx = request
656            .header
657            .as_ref()
658            .map(|h| Arc::new(QueryContext::from(h)))
659            .unwrap_or(QueryContext::arc());
660
661        let result = self
662            .handle_remote_read(request, query_ctx.clone())
663            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
664            .await?;
665
666        let stream = Box::pin(FlightRecordBatchStream::new(
667            result,
668            tracing_context,
669            self.flight_compression,
670            query_ctx,
671        ));
672        Ok(Response::new(stream))
673    }
674}
675
676#[derive(Clone)]
677enum RegionEngineWithStatus {
678    // An opening, or creating region.
679    Registering(RegionEngineRef),
680    // A closing, or dropping region.
681    Deregistering(RegionEngineRef),
682    // A ready region.
683    Ready(RegionEngineRef),
684}
685
686impl RegionEngineWithStatus {
687    /// Returns [RegionEngineRef].
688    pub fn into_engine(self) -> RegionEngineRef {
689        match self {
690            RegionEngineWithStatus::Registering(engine) => engine,
691            RegionEngineWithStatus::Deregistering(engine) => engine,
692            RegionEngineWithStatus::Ready(engine) => engine,
693        }
694    }
695
696    /// Returns [RegionEngineRef] reference.
697    pub fn engine(&self) -> &RegionEngineRef {
698        match self {
699            RegionEngineWithStatus::Registering(engine) => engine,
700            RegionEngineWithStatus::Deregistering(engine) => engine,
701            RegionEngineWithStatus::Ready(engine) => engine,
702        }
703    }
704}
705
706impl Deref for RegionEngineWithStatus {
707    type Target = RegionEngineRef;
708
709    fn deref(&self) -> &Self::Target {
710        match self {
711            RegionEngineWithStatus::Registering(engine) => engine,
712            RegionEngineWithStatus::Deregistering(engine) => engine,
713            RegionEngineWithStatus::Ready(engine) => engine,
714        }
715    }
716}
717
718struct RegionServerInner {
719    engines: RwLock<HashMap<String, RegionEngineRef>>,
720    region_map: DashMap<RegionId, RegionEngineWithStatus>,
721    query_engine: QueryEngineRef,
722    runtime: Runtime,
723    event_listener: RegionServerEventListenerRef,
724    table_provider_factory: TableProviderFactoryRef,
725    /// The number of queries allowed to be executed at the same time.
726    /// Act as last line of defense on datanode to prevent query overloading.
727    parallelism: Option<RegionServerParallelism>,
728    /// The topic stats reporter.
729    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
730    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
731    /// server with a concrete engine; acceptable for now to fetch Mito-specific
732    /// info (e.g., list SSTs). Consider a diagnostics trait later.
733    mito_engine: RwLock<Option<MitoEngine>>,
734}
735
736struct RegionServerParallelism {
737    semaphore: Semaphore,
738    timeout: Duration,
739}
740
741impl RegionServerParallelism {
742    pub fn from_opts(
743        max_concurrent_queries: usize,
744        concurrent_query_limiter_timeout: Duration,
745    ) -> Option<Self> {
746        if max_concurrent_queries == 0 {
747            return None;
748        }
749        Some(RegionServerParallelism {
750            semaphore: Semaphore::new(max_concurrent_queries),
751            timeout: concurrent_query_limiter_timeout,
752        })
753    }
754
755    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
756        timeout(self.timeout, self.semaphore.acquire())
757            .await
758            .context(ConcurrentQueryLimiterTimeoutSnafu)?
759            .context(ConcurrentQueryLimiterClosedSnafu)
760    }
761}
762
763enum CurrentEngine {
764    Engine(RegionEngineRef),
765    EarlyReturn(AffectedRows),
766}
767
768impl Debug for CurrentEngine {
769    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
770        match self {
771            CurrentEngine::Engine(engine) => f
772                .debug_struct("CurrentEngine")
773                .field("engine", &engine.name())
774                .finish(),
775            CurrentEngine::EarlyReturn(rows) => f
776                .debug_struct("CurrentEngine")
777                .field("return", rows)
778                .finish(),
779        }
780    }
781}
782
783impl RegionServerInner {
784    pub fn new(
785        query_engine: QueryEngineRef,
786        runtime: Runtime,
787        event_listener: RegionServerEventListenerRef,
788        table_provider_factory: TableProviderFactoryRef,
789        parallelism: Option<RegionServerParallelism>,
790    ) -> Self {
791        Self {
792            engines: RwLock::new(HashMap::new()),
793            region_map: DashMap::new(),
794            query_engine,
795            runtime,
796            event_listener,
797            table_provider_factory,
798            parallelism,
799            topic_stats_reporter: RwLock::new(None),
800            mito_engine: RwLock::new(None),
801        }
802    }
803
804    pub fn register_engine(&self, engine: RegionEngineRef) {
805        let engine_name = engine.name();
806        if engine_name == MITO_ENGINE_NAME
807            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
808        {
809            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
810        }
811
812        info!("Region Engine {engine_name} is registered");
813        self.engines
814            .write()
815            .unwrap()
816            .insert(engine_name.to_string(), engine);
817    }
818
819    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
820        info!("Set topic stats reporter");
821        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
822    }
823
824    fn get_engine(
825        &self,
826        region_id: RegionId,
827        region_change: &RegionChange,
828    ) -> Result<CurrentEngine> {
829        let current_region_status = self.region_map.get(&region_id);
830
831        let engine = match region_change {
832            RegionChange::Register(attribute) => match current_region_status {
833                Some(status) => match status.clone() {
834                    RegionEngineWithStatus::Registering(engine) => engine,
835                    RegionEngineWithStatus::Deregistering(_) => {
836                        return error::RegionBusySnafu { region_id }.fail();
837                    }
838                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
839                },
840                _ => self
841                    .engines
842                    .read()
843                    .unwrap()
844                    .get(attribute.engine())
845                    .with_context(|| RegionEngineNotFoundSnafu {
846                        name: attribute.engine(),
847                    })?
848                    .clone(),
849            },
850            RegionChange::Deregisters => match current_region_status {
851                Some(status) => match status.clone() {
852                    RegionEngineWithStatus::Registering(_) => {
853                        return error::RegionBusySnafu { region_id }.fail();
854                    }
855                    RegionEngineWithStatus::Deregistering(_) => {
856                        return Ok(CurrentEngine::EarlyReturn(0));
857                    }
858                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
859                },
860                None => return Ok(CurrentEngine::EarlyReturn(0)),
861            },
862            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
863                match current_region_status {
864                    Some(status) => match status.clone() {
865                        RegionEngineWithStatus::Registering(_) => {
866                            return error::RegionNotReadySnafu { region_id }.fail();
867                        }
868                        RegionEngineWithStatus::Deregistering(_) => {
869                            return error::RegionNotFoundSnafu { region_id }.fail();
870                        }
871                        RegionEngineWithStatus::Ready(engine) => engine,
872                    },
873                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
874                }
875            }
876        };
877
878        Ok(CurrentEngine::Engine(engine))
879    }
880
881    async fn handle_batch_open_requests_inner(
882        &self,
883        engine: RegionEngineRef,
884        parallelism: usize,
885        requests: Vec<(RegionId, RegionOpenRequest)>,
886        ignore_nonexistent_region: bool,
887    ) -> Result<Vec<RegionId>> {
888        let region_changes = requests
889            .iter()
890            .map(|(region_id, open)| {
891                let attribute = parse_region_attribute(&open.engine, &open.options)?;
892                Ok((*region_id, RegionChange::Register(attribute)))
893            })
894            .collect::<Result<HashMap<_, _>>>()?;
895
896        for (&region_id, region_change) in &region_changes {
897            self.set_region_status_not_ready(region_id, &engine, region_change)
898        }
899
900        let mut open_regions = Vec::with_capacity(requests.len());
901        let mut errors = vec![];
902        match engine
903            .handle_batch_open_requests(parallelism, requests)
904            .await
905            .with_context(|_| HandleBatchOpenRequestSnafu)
906        {
907            Ok(results) => {
908                for (region_id, result) in results {
909                    let region_change = &region_changes[&region_id];
910                    match result {
911                        Ok(_) => {
912                            if let Err(e) = self
913                                .set_region_status_ready(region_id, engine.clone(), *region_change)
914                                .await
915                            {
916                                error!(e; "Failed to set region to ready: {}", region_id);
917                                errors.push(BoxedError::new(e));
918                            } else {
919                                open_regions.push(region_id)
920                            }
921                        }
922                        Err(e) => {
923                            self.unset_region_status(region_id, &engine, *region_change);
924                            if e.status_code() == StatusCode::RegionNotFound
925                                && ignore_nonexistent_region
926                            {
927                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
928                            } else {
929                                error!(e; "Failed to open region: {}", region_id);
930                                errors.push(e);
931                            }
932                        }
933                    }
934                }
935            }
936            Err(e) => {
937                for (&region_id, region_change) in &region_changes {
938                    self.unset_region_status(region_id, &engine, *region_change);
939                }
940                error!(e; "Failed to open batch regions");
941                errors.push(BoxedError::new(e));
942            }
943        }
944
945        if !errors.is_empty() {
946            return error::UnexpectedSnafu {
947                // Returns the first error.
948                violated: format!("Failed to open batch regions: {:?}", errors[0]),
949            }
950            .fail();
951        }
952
953        Ok(open_regions)
954    }
955
956    pub async fn handle_batch_open_requests(
957        &self,
958        parallelism: usize,
959        requests: Vec<(RegionId, RegionOpenRequest)>,
960        ignore_nonexistent_region: bool,
961    ) -> Result<Vec<RegionId>> {
962        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
963            HashMap::with_capacity(requests.len());
964        for (region_id, request) in requests {
965            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
966                requests.push((region_id, request));
967            } else {
968                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
969            }
970        }
971
972        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
973        for (engine, requests) in engine_grouped_requests {
974            let engine = self
975                .engines
976                .read()
977                .unwrap()
978                .get(&engine)
979                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
980                .clone();
981            results.push(
982                self.handle_batch_open_requests_inner(
983                    engine,
984                    parallelism,
985                    requests,
986                    ignore_nonexistent_region,
987                )
988                .await,
989            )
990        }
991
992        Ok(results
993            .into_iter()
994            .collect::<Result<Vec<_>>>()?
995            .into_iter()
996            .flatten()
997            .collect::<Vec<_>>())
998    }
999
1000    pub async fn handle_batch_catchup_requests_inner(
1001        &self,
1002        engine: RegionEngineRef,
1003        parallelism: usize,
1004        requests: Vec<(RegionId, RegionCatchupRequest)>,
1005    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1006        for (region_id, _) in &requests {
1007            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1008        }
1009        let region_ids = requests
1010            .iter()
1011            .map(|(region_id, _)| *region_id)
1012            .collect::<Vec<_>>();
1013        let mut responses = Vec::with_capacity(requests.len());
1014        match engine
1015            .handle_batch_catchup_requests(parallelism, requests)
1016            .await
1017        {
1018            Ok(results) => {
1019                for (region_id, result) in results {
1020                    match result {
1021                        Ok(_) => {
1022                            if let Err(e) = self
1023                                .set_region_status_ready(
1024                                    region_id,
1025                                    engine.clone(),
1026                                    RegionChange::Catchup,
1027                                )
1028                                .await
1029                            {
1030                                error!(e; "Failed to set region to ready: {}", region_id);
1031                                responses.push((region_id, Err(BoxedError::new(e))));
1032                            } else {
1033                                responses.push((region_id, Ok(())));
1034                            }
1035                        }
1036                        Err(e) => {
1037                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1038                            error!(e; "Failed to catchup region: {}", region_id);
1039                            responses.push((region_id, Err(e)));
1040                        }
1041                    }
1042                }
1043            }
1044            Err(e) => {
1045                for region_id in region_ids {
1046                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1047                }
1048                error!(e; "Failed to catchup batch regions");
1049                return error::UnexpectedSnafu {
1050                    violated: format!("Failed to catchup batch regions: {:?}", e),
1051                }
1052                .fail();
1053            }
1054        }
1055
1056        Ok(responses)
1057    }
1058
1059    pub async fn handle_batch_catchup_requests(
1060        &self,
1061        parallelism: usize,
1062        requests: Vec<(RegionId, RegionCatchupRequest)>,
1063    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1064        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1065
1066        let mut responses = Vec::with_capacity(requests.len());
1067        for (region_id, request) in requests {
1068            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1069                match engine {
1070                    CurrentEngine::Engine(engine) => {
1071                        engine_grouped_requests
1072                            .entry(engine.name().to_string())
1073                            .or_default()
1074                            .push((region_id, request));
1075                    }
1076                    CurrentEngine::EarlyReturn(_) => {
1077                        return error::UnexpectedSnafu {
1078                            violated: format!("Unexpected engine type for region {}", region_id),
1079                        }
1080                        .fail();
1081                    }
1082                }
1083            } else {
1084                responses.push((
1085                    region_id,
1086                    Err(BoxedError::new(
1087                        error::RegionNotFoundSnafu { region_id }.build(),
1088                    )),
1089                ));
1090            }
1091        }
1092
1093        for (engine, requests) in engine_grouped_requests {
1094            let engine = self
1095                .engines
1096                .read()
1097                .unwrap()
1098                .get(&engine)
1099                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1100                .clone();
1101            responses.extend(
1102                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1103                    .await?,
1104            );
1105        }
1106
1107        Ok(responses)
1108    }
1109
1110    // Handle requests in batch.
1111    //
1112    // limitation: all create requests must be in the same engine.
1113    pub async fn handle_batch_request(
1114        &self,
1115        batch_request: BatchRegionDdlRequest,
1116    ) -> Result<RegionResponse> {
1117        let region_changes = match &batch_request {
1118            BatchRegionDdlRequest::Create(requests) => requests
1119                .iter()
1120                .map(|(region_id, create)| {
1121                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1122                    Ok((*region_id, RegionChange::Register(attribute)))
1123                })
1124                .collect::<Result<Vec<_>>>()?,
1125            BatchRegionDdlRequest::Drop(requests) => requests
1126                .iter()
1127                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1128                .collect::<Vec<_>>(),
1129            BatchRegionDdlRequest::Alter(requests) => requests
1130                .iter()
1131                .map(|(region_id, _)| (*region_id, RegionChange::None))
1132                .collect::<Vec<_>>(),
1133        };
1134
1135        // The ddl procedure will ensure all requests are in the same engine.
1136        // Therefore, we can get the engine from the first request.
1137        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1138        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1139            CurrentEngine::Engine(engine) => engine,
1140            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1141        };
1142
1143        for (region_id, region_change) in region_changes.iter() {
1144            self.set_region_status_not_ready(*region_id, &engine, region_change);
1145        }
1146
1147        let ddl_type = batch_request.request_type();
1148        let result = engine
1149            .handle_batch_ddl_requests(batch_request)
1150            .await
1151            .context(HandleBatchDdlRequestSnafu { ddl_type });
1152
1153        match result {
1154            Ok(result) => {
1155                for (region_id, region_change) in &region_changes {
1156                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1157                        .await?;
1158                }
1159
1160                Ok(RegionResponse {
1161                    affected_rows: result.affected_rows,
1162                    extensions: result.extensions,
1163                    metadata: Vec::new(),
1164                })
1165            }
1166            Err(err) => {
1167                for (region_id, region_change) in region_changes {
1168                    self.unset_region_status(region_id, &engine, region_change);
1169                }
1170
1171                Err(err)
1172            }
1173        }
1174    }
1175
1176    pub async fn handle_request(
1177        &self,
1178        region_id: RegionId,
1179        request: RegionRequest,
1180    ) -> Result<RegionResponse> {
1181        let request_type = request.request_type();
1182        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1183            .with_label_values(&[request_type])
1184            .start_timer();
1185
1186        let region_change = match &request {
1187            RegionRequest::Create(create) => {
1188                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1189                RegionChange::Register(attribute)
1190            }
1191            RegionRequest::Open(open) => {
1192                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1193                RegionChange::Register(attribute)
1194            }
1195            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1196            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1197                RegionChange::Ingest
1198            }
1199            RegionRequest::Alter(_)
1200            | RegionRequest::Flush(_)
1201            | RegionRequest::Compact(_)
1202            | RegionRequest::Truncate(_)
1203            | RegionRequest::BuildIndex(_) => RegionChange::None,
1204            RegionRequest::Catchup(_) => RegionChange::Catchup,
1205        };
1206
1207        let engine = match self.get_engine(region_id, &region_change)? {
1208            CurrentEngine::Engine(engine) => engine,
1209            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1210        };
1211
1212        // Sets corresponding region status to registering/deregistering before the operation.
1213        self.set_region_status_not_ready(region_id, &engine, &region_change);
1214
1215        match engine
1216            .handle_request(region_id, request)
1217            .await
1218            .with_context(|_| HandleRegionRequestSnafu { region_id })
1219        {
1220            Ok(result) => {
1221                // Update metrics
1222                if matches!(region_change, RegionChange::Ingest) {
1223                    crate::metrics::REGION_CHANGED_ROW_COUNT
1224                        .with_label_values(&[request_type])
1225                        .inc_by(result.affected_rows as u64);
1226                }
1227                // Sets corresponding region status to ready.
1228                self.set_region_status_ready(region_id, engine.clone(), region_change)
1229                    .await?;
1230
1231                Ok(RegionResponse {
1232                    affected_rows: result.affected_rows,
1233                    extensions: result.extensions,
1234                    metadata: Vec::new(),
1235                })
1236            }
1237            Err(err) => {
1238                if matches!(region_change, RegionChange::Ingest) {
1239                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1240                        .with_label_values(&[request_type])
1241                        .inc();
1242                }
1243                // Removes the region status if the operation fails.
1244                self.unset_region_status(region_id, &engine, region_change);
1245                Err(err)
1246            }
1247        }
1248    }
1249
1250    /// Handles the sync region request.
1251    pub async fn handle_sync_region(
1252        &self,
1253        engine: &RegionEngineRef,
1254        region_id: RegionId,
1255        manifest_info: RegionManifestInfo,
1256    ) -> Result<()> {
1257        let Some(new_opened_regions) = engine
1258            .sync_region(region_id, manifest_info)
1259            .await
1260            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1261            .new_opened_logical_region_ids()
1262        else {
1263            warn!("No new opened logical regions");
1264            return Ok(());
1265        };
1266
1267        for region in new_opened_regions {
1268            self.region_map
1269                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1270            info!("Logical region {} is registered!", region);
1271        }
1272
1273        Ok(())
1274    }
1275
1276    fn set_region_status_not_ready(
1277        &self,
1278        region_id: RegionId,
1279        engine: &RegionEngineRef,
1280        region_change: &RegionChange,
1281    ) {
1282        match region_change {
1283            RegionChange::Register(_) => {
1284                self.region_map.insert(
1285                    region_id,
1286                    RegionEngineWithStatus::Registering(engine.clone()),
1287                );
1288            }
1289            RegionChange::Deregisters => {
1290                self.region_map.insert(
1291                    region_id,
1292                    RegionEngineWithStatus::Deregistering(engine.clone()),
1293                );
1294            }
1295            _ => {}
1296        }
1297    }
1298
1299    fn unset_region_status(
1300        &self,
1301        region_id: RegionId,
1302        engine: &RegionEngineRef,
1303        region_change: RegionChange,
1304    ) {
1305        match region_change {
1306            RegionChange::None | RegionChange::Ingest => {}
1307            RegionChange::Register(_) => {
1308                self.region_map.remove(&region_id);
1309            }
1310            RegionChange::Deregisters => {
1311                self.region_map
1312                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1313            }
1314            RegionChange::Catchup => {}
1315        }
1316    }
1317
1318    async fn set_region_status_ready(
1319        &self,
1320        region_id: RegionId,
1321        engine: RegionEngineRef,
1322        region_change: RegionChange,
1323    ) -> Result<()> {
1324        let engine_type = engine.name();
1325        match region_change {
1326            RegionChange::None | RegionChange::Ingest => {}
1327            RegionChange::Register(attribute) => {
1328                info!(
1329                    "Region {region_id} is registered to engine {}",
1330                    attribute.engine()
1331                );
1332                self.region_map
1333                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1334
1335                match attribute {
1336                    RegionAttribute::Metric { physical } => {
1337                        if physical {
1338                            // Registers the logical regions belong to the physical region (`region_id`).
1339                            self.register_logical_regions(&engine, region_id).await?;
1340                            // We only send the `on_region_registered` event of the physical region.
1341                            self.event_listener.on_region_registered(region_id);
1342                        }
1343                    }
1344                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1345                    RegionAttribute::File => {
1346                        // do nothing
1347                    }
1348                }
1349            }
1350            RegionChange::Deregisters => {
1351                info!("Region {region_id} is deregistered from engine {engine_type}");
1352                self.region_map
1353                    .remove(&region_id)
1354                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1355                self.event_listener.on_region_deregistered(region_id);
1356            }
1357            RegionChange::Catchup => {
1358                if is_metric_engine(engine.name()) {
1359                    // Registers the logical regions belong to the physical region (`region_id`).
1360                    self.register_logical_regions(&engine, region_id).await?;
1361                }
1362            }
1363        }
1364        Ok(())
1365    }
1366
1367    async fn register_logical_regions(
1368        &self,
1369        engine: &RegionEngineRef,
1370        physical_region_id: RegionId,
1371    ) -> Result<()> {
1372        let metric_engine =
1373            engine
1374                .as_any()
1375                .downcast_ref::<MetricEngine>()
1376                .context(UnexpectedSnafu {
1377                    violated: format!(
1378                        "expecting engine type '{}', actual '{}'",
1379                        METRIC_ENGINE_NAME,
1380                        engine.name(),
1381                    ),
1382                })?;
1383
1384        let logical_regions = metric_engine
1385            .logical_regions(physical_region_id)
1386            .await
1387            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1388
1389        for region in logical_regions {
1390            self.region_map
1391                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1392            info!("Logical region {} is registered!", region);
1393        }
1394        Ok(())
1395    }
1396
1397    pub async fn handle_read(
1398        &self,
1399        request: QueryRequest,
1400        query_ctx: QueryContextRef,
1401    ) -> Result<SendableRecordBatchStream> {
1402        // TODO(ruihang): add metrics and set trace id
1403
1404        let result = self
1405            .query_engine
1406            .execute(request.plan, query_ctx)
1407            .await
1408            .context(ExecuteLogicalPlanSnafu)?;
1409
1410        match result.data {
1411            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1412                UnsupportedOutputSnafu { expected: "stream" }.fail()
1413            }
1414            OutputData::Stream(stream) => Ok(stream),
1415        }
1416    }
1417
1418    async fn stop(&self) -> Result<()> {
1419        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1420        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1421        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1422        //
1423        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1424        // it here, collect the values first then use later separately.
1425
1426        let regions = self
1427            .region_map
1428            .iter()
1429            .map(|x| (*x.key(), x.value().clone()))
1430            .collect::<Vec<_>>();
1431        let num_regions = regions.len();
1432
1433        for (region_id, engine) in regions {
1434            let closed = engine
1435                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1436                .await;
1437            match closed {
1438                Ok(_) => debug!("Region {region_id} is closed"),
1439                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1440            }
1441        }
1442        self.region_map.clear();
1443        info!("closed {num_regions} regions");
1444
1445        drop(self.mito_engine.write().unwrap().take());
1446        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1447        for (engine_name, engine) in engines {
1448            engine
1449                .stop()
1450                .await
1451                .context(StopRegionEngineSnafu { name: &engine_name })?;
1452            info!("Region engine {engine_name} is stopped");
1453        }
1454
1455        Ok(())
1456    }
1457}
1458
1459#[derive(Debug, Clone, Copy)]
1460enum RegionChange {
1461    None,
1462    Register(RegionAttribute),
1463    Deregisters,
1464    Catchup,
1465    Ingest,
1466}
1467
1468fn is_metric_engine(engine: &str) -> bool {
1469    engine == METRIC_ENGINE_NAME
1470}
1471
1472fn parse_region_attribute(
1473    engine: &str,
1474    options: &HashMap<String, String>,
1475) -> Result<RegionAttribute> {
1476    match engine {
1477        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1478        METRIC_ENGINE_NAME => {
1479            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1480
1481            Ok(RegionAttribute::Metric { physical })
1482        }
1483        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1484        _ => error::UnexpectedSnafu {
1485            violated: format!("Unknown engine: {}", engine),
1486        }
1487        .fail(),
1488    }
1489}
1490
1491#[derive(Debug, Clone, Copy)]
1492enum RegionAttribute {
1493    Mito,
1494    Metric { physical: bool },
1495    File,
1496}
1497
1498impl RegionAttribute {
1499    fn engine(&self) -> &'static str {
1500        match self {
1501            RegionAttribute::Mito => MITO_ENGINE_NAME,
1502            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1503            RegionAttribute::File => FILE_ENGINE_NAME,
1504        }
1505    }
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510
1511    use std::assert_matches::assert_matches;
1512
1513    use api::v1::SemanticType;
1514    use common_error::ext::ErrorExt;
1515    use datatypes::prelude::ConcreteDataType;
1516    use mito2::test_util::CreateRequestBuilder;
1517    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1518    use store_api::region_engine::RegionEngine;
1519    use store_api::region_request::{
1520        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1521    };
1522    use store_api::storage::RegionId;
1523
1524    use super::*;
1525    use crate::error::Result;
1526    use crate::tests::{MockRegionEngine, mock_region_server};
1527
1528    #[tokio::test]
1529    async fn test_region_registering() {
1530        common_telemetry::init_default_ut_logging();
1531
1532        let mut mock_region_server = mock_region_server();
1533        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1534        let engine_name = engine.name();
1535        mock_region_server.register_engine(engine.clone());
1536        let region_id = RegionId::new(1, 1);
1537        let builder = CreateRequestBuilder::new();
1538        let create_req = builder.build();
1539        // Tries to create/open a registering region.
1540        mock_region_server.inner.region_map.insert(
1541            region_id,
1542            RegionEngineWithStatus::Registering(engine.clone()),
1543        );
1544        let response = mock_region_server
1545            .handle_request(region_id, RegionRequest::Create(create_req))
1546            .await
1547            .unwrap();
1548        assert_eq!(response.affected_rows, 0);
1549        let status = mock_region_server
1550            .inner
1551            .region_map
1552            .get(&region_id)
1553            .unwrap()
1554            .clone();
1555        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1556
1557        mock_region_server.inner.region_map.insert(
1558            region_id,
1559            RegionEngineWithStatus::Registering(engine.clone()),
1560        );
1561        let response = mock_region_server
1562            .handle_request(
1563                region_id,
1564                RegionRequest::Open(RegionOpenRequest {
1565                    engine: engine_name.to_string(),
1566                    table_dir: String::new(),
1567                    path_type: PathType::Bare,
1568                    options: Default::default(),
1569                    skip_wal_replay: false,
1570                    checkpoint: None,
1571                }),
1572            )
1573            .await
1574            .unwrap();
1575        assert_eq!(response.affected_rows, 0);
1576        let status = mock_region_server
1577            .inner
1578            .region_map
1579            .get(&region_id)
1580            .unwrap()
1581            .clone();
1582        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1583    }
1584
1585    #[tokio::test]
1586    async fn test_region_deregistering() {
1587        common_telemetry::init_default_ut_logging();
1588
1589        let mut mock_region_server = mock_region_server();
1590        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1591
1592        mock_region_server.register_engine(engine.clone());
1593
1594        let region_id = RegionId::new(1, 1);
1595
1596        // Tries to drop/close a registering region.
1597        mock_region_server.inner.region_map.insert(
1598            region_id,
1599            RegionEngineWithStatus::Deregistering(engine.clone()),
1600        );
1601
1602        let response = mock_region_server
1603            .handle_request(
1604                region_id,
1605                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1606            )
1607            .await
1608            .unwrap();
1609        assert_eq!(response.affected_rows, 0);
1610
1611        let status = mock_region_server
1612            .inner
1613            .region_map
1614            .get(&region_id)
1615            .unwrap()
1616            .clone();
1617        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1618
1619        mock_region_server.inner.region_map.insert(
1620            region_id,
1621            RegionEngineWithStatus::Deregistering(engine.clone()),
1622        );
1623
1624        let response = mock_region_server
1625            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1626            .await
1627            .unwrap();
1628        assert_eq!(response.affected_rows, 0);
1629
1630        let status = mock_region_server
1631            .inner
1632            .region_map
1633            .get(&region_id)
1634            .unwrap()
1635            .clone();
1636        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1637    }
1638
1639    #[tokio::test]
1640    async fn test_region_not_ready() {
1641        common_telemetry::init_default_ut_logging();
1642
1643        let mut mock_region_server = mock_region_server();
1644        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1645
1646        mock_region_server.register_engine(engine.clone());
1647
1648        let region_id = RegionId::new(1, 1);
1649
1650        // Tries to drop/close a registering region.
1651        mock_region_server.inner.region_map.insert(
1652            region_id,
1653            RegionEngineWithStatus::Registering(engine.clone()),
1654        );
1655
1656        let err = mock_region_server
1657            .handle_request(
1658                region_id,
1659                RegionRequest::Truncate(RegionTruncateRequest::All),
1660            )
1661            .await
1662            .unwrap_err();
1663
1664        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1665    }
1666
1667    #[tokio::test]
1668    async fn test_region_request_failed() {
1669        common_telemetry::init_default_ut_logging();
1670
1671        let mut mock_region_server = mock_region_server();
1672        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1673            MITO_ENGINE_NAME,
1674            Box::new(|_region_id, _request| {
1675                error::UnexpectedSnafu {
1676                    violated: "test".to_string(),
1677                }
1678                .fail()
1679            }),
1680        );
1681
1682        mock_region_server.register_engine(engine.clone());
1683
1684        let region_id = RegionId::new(1, 1);
1685        let builder = CreateRequestBuilder::new();
1686        let create_req = builder.build();
1687        mock_region_server
1688            .handle_request(region_id, RegionRequest::Create(create_req))
1689            .await
1690            .unwrap_err();
1691
1692        let status = mock_region_server.inner.region_map.get(&region_id);
1693        assert!(status.is_none());
1694
1695        mock_region_server
1696            .inner
1697            .region_map
1698            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1699
1700        mock_region_server
1701            .handle_request(
1702                region_id,
1703                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1704            )
1705            .await
1706            .unwrap_err();
1707
1708        let status = mock_region_server.inner.region_map.get(&region_id);
1709        assert!(status.is_some());
1710    }
1711
1712    #[tokio::test]
1713    async fn test_batch_open_region_ignore_nonexistent_regions() {
1714        common_telemetry::init_default_ut_logging();
1715        let mut mock_region_server = mock_region_server();
1716        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1717            MITO_ENGINE_NAME,
1718            Box::new(|region_id, _request| {
1719                if region_id == RegionId::new(1, 1) {
1720                    error::RegionNotFoundSnafu { region_id }.fail()
1721                } else {
1722                    Ok(0)
1723                }
1724            }),
1725        );
1726        mock_region_server.register_engine(engine.clone());
1727
1728        let region_ids = mock_region_server
1729            .handle_batch_open_requests(
1730                8,
1731                vec![
1732                    (
1733                        RegionId::new(1, 1),
1734                        RegionOpenRequest {
1735                            engine: MITO_ENGINE_NAME.to_string(),
1736                            table_dir: String::new(),
1737                            path_type: PathType::Bare,
1738                            options: Default::default(),
1739                            skip_wal_replay: false,
1740                            checkpoint: None,
1741                        },
1742                    ),
1743                    (
1744                        RegionId::new(1, 2),
1745                        RegionOpenRequest {
1746                            engine: MITO_ENGINE_NAME.to_string(),
1747                            table_dir: String::new(),
1748                            path_type: PathType::Bare,
1749                            options: Default::default(),
1750                            skip_wal_replay: false,
1751                            checkpoint: None,
1752                        },
1753                    ),
1754                ],
1755                true,
1756            )
1757            .await
1758            .unwrap();
1759        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1760
1761        let err = mock_region_server
1762            .handle_batch_open_requests(
1763                8,
1764                vec![
1765                    (
1766                        RegionId::new(1, 1),
1767                        RegionOpenRequest {
1768                            engine: MITO_ENGINE_NAME.to_string(),
1769                            table_dir: String::new(),
1770                            path_type: PathType::Bare,
1771                            options: Default::default(),
1772                            skip_wal_replay: false,
1773                            checkpoint: None,
1774                        },
1775                    ),
1776                    (
1777                        RegionId::new(1, 2),
1778                        RegionOpenRequest {
1779                            engine: MITO_ENGINE_NAME.to_string(),
1780                            table_dir: String::new(),
1781                            path_type: PathType::Bare,
1782                            options: Default::default(),
1783                            skip_wal_replay: false,
1784                            checkpoint: None,
1785                        },
1786                    ),
1787                ],
1788                false,
1789            )
1790            .await
1791            .unwrap_err();
1792        assert_eq!(err.status_code(), StatusCode::Unexpected);
1793    }
1794
1795    struct CurrentEngineTest {
1796        region_id: RegionId,
1797        current_region_status: Option<RegionEngineWithStatus>,
1798        region_change: RegionChange,
1799        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1800    }
1801
1802    #[tokio::test]
1803    async fn test_current_engine() {
1804        common_telemetry::init_default_ut_logging();
1805
1806        let mut mock_region_server = mock_region_server();
1807        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1808        mock_region_server.register_engine(engine.clone());
1809
1810        let region_id = RegionId::new(1024, 1);
1811        let tests = vec![
1812            // RegionChange::None
1813            CurrentEngineTest {
1814                region_id,
1815                current_region_status: None,
1816                region_change: RegionChange::None,
1817                assert: Box::new(|result| {
1818                    let err = result.unwrap_err();
1819                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1820                }),
1821            },
1822            CurrentEngineTest {
1823                region_id,
1824                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1825                region_change: RegionChange::None,
1826                assert: Box::new(|result| {
1827                    let current_engine = result.unwrap();
1828                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1829                }),
1830            },
1831            CurrentEngineTest {
1832                region_id,
1833                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1834                region_change: RegionChange::None,
1835                assert: Box::new(|result| {
1836                    let err = result.unwrap_err();
1837                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1838                }),
1839            },
1840            CurrentEngineTest {
1841                region_id,
1842                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1843                region_change: RegionChange::None,
1844                assert: Box::new(|result| {
1845                    let err = result.unwrap_err();
1846                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1847                }),
1848            },
1849            // RegionChange::Register
1850            CurrentEngineTest {
1851                region_id,
1852                current_region_status: None,
1853                region_change: RegionChange::Register(RegionAttribute::Mito),
1854                assert: Box::new(|result| {
1855                    let current_engine = result.unwrap();
1856                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1857                }),
1858            },
1859            CurrentEngineTest {
1860                region_id,
1861                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1862                region_change: RegionChange::Register(RegionAttribute::Mito),
1863                assert: Box::new(|result| {
1864                    let current_engine = result.unwrap();
1865                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1866                }),
1867            },
1868            CurrentEngineTest {
1869                region_id,
1870                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1871                region_change: RegionChange::Register(RegionAttribute::Mito),
1872                assert: Box::new(|result| {
1873                    let err = result.unwrap_err();
1874                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1875                }),
1876            },
1877            CurrentEngineTest {
1878                region_id,
1879                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1880                region_change: RegionChange::Register(RegionAttribute::Mito),
1881                assert: Box::new(|result| {
1882                    let current_engine = result.unwrap();
1883                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1884                }),
1885            },
1886            // RegionChange::Deregister
1887            CurrentEngineTest {
1888                region_id,
1889                current_region_status: None,
1890                region_change: RegionChange::Deregisters,
1891                assert: Box::new(|result| {
1892                    let current_engine = result.unwrap();
1893                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1894                }),
1895            },
1896            CurrentEngineTest {
1897                region_id,
1898                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1899                region_change: RegionChange::Deregisters,
1900                assert: Box::new(|result| {
1901                    let err = result.unwrap_err();
1902                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1903                }),
1904            },
1905            CurrentEngineTest {
1906                region_id,
1907                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1908                region_change: RegionChange::Deregisters,
1909                assert: Box::new(|result| {
1910                    let current_engine = result.unwrap();
1911                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1912                }),
1913            },
1914            CurrentEngineTest {
1915                region_id,
1916                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1917                region_change: RegionChange::Deregisters,
1918                assert: Box::new(|result| {
1919                    let current_engine = result.unwrap();
1920                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1921                }),
1922            },
1923        ];
1924
1925        for test in tests {
1926            let CurrentEngineTest {
1927                region_id,
1928                current_region_status,
1929                region_change,
1930                assert,
1931            } = test;
1932
1933            // Sets up
1934            if let Some(status) = current_region_status {
1935                mock_region_server
1936                    .inner
1937                    .region_map
1938                    .insert(region_id, status);
1939            } else {
1940                mock_region_server.inner.region_map.remove(&region_id);
1941            }
1942
1943            let result = mock_region_server
1944                .inner
1945                .get_engine(region_id, &region_change);
1946
1947            assert(result);
1948        }
1949    }
1950
1951    #[tokio::test]
1952    async fn test_region_server_parallelism() {
1953        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1954        let first_query = p.acquire().await;
1955        assert!(first_query.is_ok());
1956        let second_query = p.acquire().await;
1957        assert!(second_query.is_ok());
1958        let third_query = p.acquire().await;
1959        assert!(third_query.is_err());
1960        let err = third_query.unwrap_err();
1961        assert_eq!(
1962            err.output_msg(),
1963            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1964        );
1965        drop(first_query);
1966        let forth_query = p.acquire().await;
1967        assert!(forth_query.is_ok());
1968    }
1969
1970    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1971        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1972        metadata_builder.push_column_metadata(ColumnMetadata {
1973            column_schema: datatypes::schema::ColumnSchema::new(
1974                "timestamp",
1975                ConcreteDataType::timestamp_nanosecond_datatype(),
1976                false,
1977            ),
1978            semantic_type: SemanticType::Timestamp,
1979            column_id: 0,
1980        });
1981        metadata_builder.push_column_metadata(ColumnMetadata {
1982            column_schema: datatypes::schema::ColumnSchema::new(
1983                "file",
1984                ConcreteDataType::string_datatype(),
1985                true,
1986            ),
1987            semantic_type: SemanticType::Tag,
1988            column_id: 1,
1989        });
1990        metadata_builder.push_column_metadata(ColumnMetadata {
1991            column_schema: datatypes::schema::ColumnSchema::new(
1992                "message",
1993                ConcreteDataType::string_datatype(),
1994                true,
1995            ),
1996            semantic_type: SemanticType::Field,
1997            column_id: 2,
1998        });
1999        metadata_builder.primary_key(vec![1]);
2000        metadata_builder.build().unwrap()
2001    }
2002
2003    #[tokio::test]
2004    async fn test_handle_list_metadata_request() {
2005        common_telemetry::init_default_ut_logging();
2006
2007        let mut mock_region_server = mock_region_server();
2008        let region_id_1 = RegionId::new(1, 0);
2009        let region_id_2 = RegionId::new(2, 0);
2010
2011        let metadata_1 = mock_region_metadata(region_id_1);
2012        let metadata_2 = mock_region_metadata(region_id_2);
2013        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2014
2015        let metadata_1 = Arc::new(metadata_1);
2016        let metadata_2 = Arc::new(metadata_2);
2017        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2018            MITO_ENGINE_NAME,
2019            Box::new(move |region_id| {
2020                if region_id == region_id_1 {
2021                    Ok(metadata_1.clone())
2022                } else if region_id == region_id_2 {
2023                    Ok(metadata_2.clone())
2024                } else {
2025                    error::RegionNotFoundSnafu { region_id }.fail()
2026                }
2027            }),
2028        );
2029
2030        mock_region_server.register_engine(engine.clone());
2031        mock_region_server
2032            .inner
2033            .region_map
2034            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2035        mock_region_server
2036            .inner
2037            .region_map
2038            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2039
2040        // All regions exist.
2041        let list_metadata_request = ListMetadataRequest {
2042            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2043        };
2044        let response = mock_region_server
2045            .handle_list_metadata_request(&list_metadata_request)
2046            .await
2047            .unwrap();
2048        let decoded_metadata: Vec<Option<RegionMetadata>> =
2049            serde_json::from_slice(&response.metadata).unwrap();
2050        assert_eq!(metadatas, decoded_metadata);
2051    }
2052
2053    #[tokio::test]
2054    async fn test_handle_list_metadata_not_found() {
2055        common_telemetry::init_default_ut_logging();
2056
2057        let mut mock_region_server = mock_region_server();
2058        let region_id_1 = RegionId::new(1, 0);
2059        let region_id_2 = RegionId::new(2, 0);
2060
2061        let metadata_1 = mock_region_metadata(region_id_1);
2062        let metadatas = vec![Some(metadata_1.clone()), None];
2063
2064        let metadata_1 = Arc::new(metadata_1);
2065        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2066            MITO_ENGINE_NAME,
2067            Box::new(move |region_id| {
2068                if region_id == region_id_1 {
2069                    Ok(metadata_1.clone())
2070                } else {
2071                    error::RegionNotFoundSnafu { region_id }.fail()
2072                }
2073            }),
2074        );
2075
2076        mock_region_server.register_engine(engine.clone());
2077        mock_region_server
2078            .inner
2079            .region_map
2080            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2081
2082        // Not in region map.
2083        let list_metadata_request = ListMetadataRequest {
2084            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2085        };
2086        let response = mock_region_server
2087            .handle_list_metadata_request(&list_metadata_request)
2088            .await
2089            .unwrap();
2090        let decoded_metadata: Vec<Option<RegionMetadata>> =
2091            serde_json::from_slice(&response.metadata).unwrap();
2092        assert_eq!(metadatas, decoded_metadata);
2093
2094        // Not in region engine.
2095        mock_region_server
2096            .inner
2097            .region_map
2098            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2099        let response = mock_region_server
2100            .handle_list_metadata_request(&list_metadata_request)
2101            .await
2102            .unwrap();
2103        let decoded_metadata: Vec<Option<RegionMetadata>> =
2104            serde_json::from_slice(&response.metadata).unwrap();
2105        assert_eq!(metadatas, decoded_metadata);
2106    }
2107
2108    #[tokio::test]
2109    async fn test_handle_list_metadata_failed() {
2110        common_telemetry::init_default_ut_logging();
2111
2112        let mut mock_region_server = mock_region_server();
2113        let region_id_1 = RegionId::new(1, 0);
2114
2115        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2116            MITO_ENGINE_NAME,
2117            Box::new(move |region_id| {
2118                error::UnexpectedSnafu {
2119                    violated: format!("Failed to get region {region_id}"),
2120                }
2121                .fail()
2122            }),
2123        );
2124
2125        mock_region_server.register_engine(engine.clone());
2126        mock_region_server
2127            .inner
2128            .region_map
2129            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2130
2131        // Failed to get.
2132        let list_metadata_request = ListMetadataRequest {
2133            region_ids: vec![region_id_1.as_u64()],
2134        };
2135        mock_region_server
2136            .handle_list_metadata_request(&list_metadata_request)
2137            .await
2138            .unwrap_err();
2139    }
2140}