datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use either::Either;
48use futures_util::future::try_join_all;
49use metric_engine::engine::MetricEngine;
50use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
51use prost::Message;
52use query::QueryEngineRef;
53pub use query::dummy_catalog::{
54    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
55};
56use serde_json;
57use servers::error::{
58    self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
59};
60use servers::grpc::FlightCompression;
61use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
62use servers::grpc::region_server::RegionServerHandler;
63use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metric_engine_consts::{
66    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
67};
68use store_api::region_engine::{
69    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
70    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
71    SyncRegionFromRequest,
72};
73use store_api::region_request::{
74    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
75    RegionOpenRequest, RegionRequest,
76};
77use store_api::storage::RegionId;
78use tokio::sync::{Semaphore, SemaphorePermit};
79use tokio::time::timeout;
80use tonic::{Request, Response, Result as TonicResult};
81
82use crate::error::{
83    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
84    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
85    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
86    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
87    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
88    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
89};
90use crate::event_listener::RegionServerEventListenerRef;
91use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
92
93#[derive(Clone)]
94pub struct RegionServer {
95    inner: Arc<RegionServerInner>,
96    flight_compression: FlightCompression,
97    suspend: Arc<AtomicBool>,
98}
99
100pub struct RegionStat {
101    pub region_id: RegionId,
102    pub engine: String,
103    pub role: RegionRole,
104}
105
106impl RegionServer {
107    pub fn new(
108        query_engine: QueryEngineRef,
109        runtime: Runtime,
110        event_listener: RegionServerEventListenerRef,
111        flight_compression: FlightCompression,
112    ) -> Self {
113        Self::with_table_provider(
114            query_engine,
115            runtime,
116            event_listener,
117            Arc::new(DummyTableProviderFactory),
118            0,
119            Duration::from_millis(0),
120            flight_compression,
121        )
122    }
123
124    pub fn with_table_provider(
125        query_engine: QueryEngineRef,
126        runtime: Runtime,
127        event_listener: RegionServerEventListenerRef,
128        table_provider_factory: TableProviderFactoryRef,
129        max_concurrent_queries: usize,
130        concurrent_query_limiter_timeout: Duration,
131        flight_compression: FlightCompression,
132    ) -> Self {
133        Self {
134            inner: Arc::new(RegionServerInner::new(
135                query_engine,
136                runtime,
137                event_listener,
138                table_provider_factory,
139                RegionServerParallelism::from_opts(
140                    max_concurrent_queries,
141                    concurrent_query_limiter_timeout,
142                ),
143            )),
144            flight_compression,
145            suspend: Arc::new(AtomicBool::new(false)),
146        }
147    }
148
149    /// Registers an engine.
150    pub fn register_engine(&mut self, engine: RegionEngineRef) {
151        self.inner.register_engine(engine);
152    }
153
154    /// Sets the topic stats.
155    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
156        self.inner.set_topic_stats_reporter(topic_stats_reporter);
157    }
158
159    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
160    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
161        match self.inner.get_engine(region_id, &RegionChange::None) {
162            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
163            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
164            Err(error::Error::RegionNotFound { .. }) => Ok(None),
165            Err(err) => Err(err),
166        }
167    }
168
169    /// Gets the MitoEngine if it's registered.
170    pub fn mito_engine(&self) -> Option<MitoEngine> {
171        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
172            Some(mito)
173        } else {
174            self.inner
175                .engines
176                .read()
177                .unwrap()
178                .get(MITO_ENGINE_NAME)
179                .cloned()
180                .and_then(|e| {
181                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
182                    if mito.is_none() {
183                        warn!("Mito engine not found in region server engines");
184                    }
185                    mito
186                })
187        }
188    }
189
190    #[tracing::instrument(skip_all)]
191    pub async fn handle_batch_open_requests(
192        &self,
193        parallelism: usize,
194        requests: Vec<(RegionId, RegionOpenRequest)>,
195        ignore_nonexistent_region: bool,
196    ) -> Result<Vec<RegionId>> {
197        self.inner
198            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
199            .await
200    }
201
202    #[tracing::instrument(skip_all)]
203    pub async fn handle_batch_catchup_requests(
204        &self,
205        parallelism: usize,
206        requests: Vec<(RegionId, RegionCatchupRequest)>,
207    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
208        self.inner
209            .handle_batch_catchup_requests(parallelism, requests)
210            .await
211    }
212
213    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
214    pub async fn handle_request(
215        &self,
216        region_id: RegionId,
217        request: RegionRequest,
218    ) -> Result<RegionResponse> {
219        self.inner.handle_request(region_id, request).await
220    }
221
222    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
223    async fn table_provider(
224        &self,
225        region_id: RegionId,
226        ctx: Option<QueryContextRef>,
227    ) -> Result<Arc<dyn TableProvider>> {
228        let status = self
229            .inner
230            .region_map
231            .get(&region_id)
232            .context(RegionNotFoundSnafu { region_id })?
233            .clone();
234        ensure!(
235            matches!(status, RegionEngineWithStatus::Ready(_)),
236            RegionNotReadySnafu { region_id }
237        );
238
239        self.inner
240            .table_provider_factory
241            .create(region_id, status.into_engine(), ctx)
242            .await
243            .context(ExecuteLogicalPlanSnafu)
244    }
245
246    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
247    pub async fn handle_remote_read(
248        &self,
249        request: api::v1::region::QueryRequest,
250        query_ctx: QueryContextRef,
251    ) -> Result<SendableRecordBatchStream> {
252        let _permit = if let Some(p) = &self.inner.parallelism {
253            Some(p.acquire().await?)
254        } else {
255            None
256        };
257
258        let region_id = RegionId::from_u64(request.region_id);
259        let catalog_list = Arc::new(NameAwareCatalogList::new(
260            self.clone(),
261            region_id,
262            query_ctx.clone(),
263        ));
264
265        if query_ctx.explain_verbose() {
266            common_telemetry::info!("Handle remote read for region: {}", region_id);
267        }
268
269        let decoder = self
270            .inner
271            .query_engine
272            .engine_context(query_ctx.clone())
273            .new_plan_decoder()
274            .context(NewPlanDecoderSnafu)?;
275
276        let plan = decoder
277            .decode(Bytes::from(request.plan), catalog_list, false)
278            .await
279            .context(DecodeLogicalPlanSnafu)?;
280
281        self.inner
282            .handle_read(
283                QueryRequest {
284                    header: request.header,
285                    region_id,
286                    plan,
287                },
288                query_ctx,
289            )
290            .await
291    }
292
293    #[tracing::instrument(skip_all)]
294    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
295        let _permit = if let Some(p) = &self.inner.parallelism {
296            Some(p.acquire().await?)
297        } else {
298            None
299        };
300
301        let ctx = request.header.as_ref().map(|h| h.into());
302        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
303
304        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
305            .context(DataFusionSnafu)?;
306        let mut injector = injector_builder
307            .build(self, request.region_id, query_ctx.clone())
308            .await?;
309
310        let plan = request
311            .plan
312            .rewrite(&mut injector)
313            .context(DataFusionSnafu)?
314            .data;
315
316        self.inner
317            .handle_read(QueryRequest { plan, ..request }, query_ctx)
318            .await
319    }
320
321    /// Returns all opened and reportable regions.
322    ///
323    /// Notes: except all metrics regions.
324    pub fn reportable_regions(&self) -> Vec<RegionStat> {
325        self.inner
326            .region_map
327            .iter()
328            .filter_map(|e| {
329                let region_id = *e.key();
330                // Filters out any regions whose role equals None.
331                e.role(region_id).map(|role| RegionStat {
332                    region_id,
333                    engine: e.value().name().to_string(),
334                    role,
335                })
336            })
337            .collect()
338    }
339
340    /// Returns the reportable topics.
341    pub fn topic_stats(&self) -> Vec<TopicStat> {
342        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
343        let Some(reporter) = reporter.as_mut() else {
344            return vec![];
345        };
346        reporter
347            .reportable_topics()
348            .into_iter()
349            .map(|stat| TopicStat {
350                topic_name: stat.topic,
351                record_size: stat.record_size,
352                record_num: stat.record_num,
353                latest_entry_id: stat.latest_entry_id,
354            })
355            .collect()
356    }
357
358    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
359        self.inner.region_map.get(&region_id).and_then(|engine| {
360            engine.role(region_id).map(|role| match role {
361                RegionRole::Follower => false,
362                RegionRole::Leader => true,
363                RegionRole::DowngradingLeader => true,
364            })
365        })
366    }
367
368    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
369        let engine = self
370            .inner
371            .region_map
372            .get(&region_id)
373            .with_context(|| RegionNotFoundSnafu { region_id })?;
374        engine
375            .set_region_role(region_id, role)
376            .with_context(|_| HandleRegionRequestSnafu { region_id })
377    }
378
379    /// Set region role state gracefully.
380    ///
381    /// For [SettableRegionRoleState::Follower]:
382    /// After the call returns, the engine ensures that
383    /// no **further** write or flush operations will succeed in this region.
384    ///
385    /// For [SettableRegionRoleState::DowngradingLeader]:
386    /// After the call returns, the engine ensures that
387    /// no **further** write operations will succeed in this region.
388    pub async fn set_region_role_state_gracefully(
389        &self,
390        region_id: RegionId,
391        state: SettableRegionRoleState,
392    ) -> Result<SetRegionRoleStateResponse> {
393        match self.inner.region_map.get(&region_id) {
394            Some(engine) => Ok(engine
395                .set_region_role_state_gracefully(region_id, state)
396                .await
397                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
398            None => Ok(SetRegionRoleStateResponse::NotFound),
399        }
400    }
401
402    pub fn runtime(&self) -> Runtime {
403        self.inner.runtime.clone()
404    }
405
406    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
407        match self.inner.region_map.get(&region_id) {
408            Some(e) => e.region_statistic(region_id),
409            None => None,
410        }
411    }
412
413    /// Stop the region server.
414    pub async fn stop(&self) -> Result<()> {
415        self.inner.stop().await
416    }
417
418    #[cfg(test)]
419    /// Registers a region for test purpose.
420    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
421        {
422            let mut engines = self.inner.engines.write().unwrap();
423            if !engines.contains_key(engine.name()) {
424                debug!("Registering test engine: {}", engine.name());
425                engines.insert(engine.name().to_string(), engine.clone());
426            }
427        }
428
429        self.inner
430            .region_map
431            .insert(region_id, RegionEngineWithStatus::Ready(engine));
432    }
433
434    async fn handle_batch_ddl_requests(
435        &self,
436        request: region_request::Body,
437    ) -> Result<RegionResponse> {
438        // Safety: we have already checked the request type in `RegionServer::handle()`.
439        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
440            .context(BuildRegionRequestsSnafu)?
441            .unwrap();
442        let tracing_context = TracingContext::from_current_span();
443
444        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
445        self.inner
446            .handle_batch_request(batch_request)
447            .trace(span)
448            .await
449    }
450
451    async fn handle_requests_in_parallel(
452        &self,
453        request: region_request::Body,
454    ) -> Result<RegionResponse> {
455        let requests =
456            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
457
458        // Try to optimize batch Put requests for metric engine
459        // Returns either Some(response) or None(requests_back)
460        match self.try_handle_metric_batch_puts(requests).await? {
461            Either::Left(response) => Ok(response),
462            Either::Right(requests) => {
463                // Fallback: original parallel processing
464                let tracing_context = TracingContext::from_current_span();
465                let join_tasks =
466                    requests
467                        .into_iter()
468                        .map(|(region_id, req): (RegionId, RegionRequest)| {
469                            let self_to_move = self;
470                            let span = tracing_context.attach(info_span!(
471                                "RegionServer::handle_region_request",
472                                region_id = region_id.to_string()
473                            ));
474                            async move {
475                                self_to_move
476                                    .handle_request(region_id, req)
477                                    .trace(span)
478                                    .await
479                            }
480                        });
481
482                let results = try_join_all(join_tasks).await?;
483                let mut affected_rows = 0;
484                let mut extensions = HashMap::new();
485                for result in results {
486                    affected_rows += result.affected_rows;
487                    extensions.extend(result.extensions);
488                }
489
490                Ok(RegionResponse {
491                    affected_rows,
492                    extensions,
493                    metadata: Vec::new(),
494                })
495            }
496        }
497    }
498
499    async fn handle_requests_in_serial(
500        &self,
501        request: region_request::Body,
502    ) -> Result<RegionResponse> {
503        let requests =
504            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
505        let tracing_context = TracingContext::from_current_span();
506
507        let mut affected_rows = 0;
508        let mut extensions = HashMap::new();
509        for (region_id, req) in requests {
510            let span = tracing_context.attach(info_span!(
511                "RegionServer::handle_region_request",
512                region_id = region_id.to_string()
513            ));
514            let result = self.handle_request(region_id, req).trace(span).await?;
515
516            affected_rows += result.affected_rows;
517            extensions.extend(result.extensions);
518        }
519
520        Ok(RegionResponse {
521            affected_rows,
522            extensions,
523            metadata: Vec::new(),
524        })
525    }
526
527    /// Attempts to optimize batch Put requests for metric engine.
528    ///
529    /// Returns Either::Left(response) if optimization succeeded,
530    /// or Either::Right(original_requests) to fall back to parallel processing.
531    ///
532    /// This avoids cloning requests when optimization cannot be applied.
533    async fn try_handle_metric_batch_puts(
534        &self,
535        requests: Vec<(RegionId, RegionRequest)>,
536    ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
537        if requests.is_empty() {
538            return Ok(Either::Right(requests));
539        }
540
541        // Quick check: verify first request is Put and is metric engine
542        if !matches!(requests[0].1, RegionRequest::Put(_)) {
543            return Ok(Either::Right(requests));
544        }
545        let first_region_id = requests[0].0;
546        let request_type = requests[0].1.request_type();
547
548        // SAFETY: If the first request belongs to metric engine, then ALL requests
549        // in this batch are guaranteed to belong to metric engine. This invariant
550        // is maintained by the request batching logic upstream.
551        let engine = match self
552            .inner
553            .get_engine(first_region_id, &RegionChange::None)?
554        {
555            CurrentEngine::Engine(e) => e,
556            _ => return Ok(Either::Right(requests)),
557        };
558
559        if engine.name() != METRIC_ENGINE_NAME {
560            return Ok(Either::Right(requests));
561        }
562
563        // Check if ALL requests are Put (now we know it's worth checking)
564        let mut all_puts = true;
565        for (_, req) in &requests {
566            if !matches!(req, RegionRequest::Put(_)) {
567                all_puts = false;
568                break;
569            }
570        }
571
572        if !all_puts {
573            return Ok(Either::Right(requests));
574        }
575
576        // Now extract Put requests by consuming ownership (zero clone!)
577        let put_requests = requests.into_iter().map(|(region_id, req)| {
578            if let RegionRequest::Put(put) = req {
579                (region_id, put)
580            } else {
581                unreachable!("Already checked all are Put")
582            }
583        });
584
585        // Downcast to MetricEngine and call batch API
586        let metric_engine =
587            engine
588                .as_any()
589                .downcast_ref::<MetricEngine>()
590                .context(UnexpectedSnafu {
591                    violated: "Failed to downcast to MetricEngine",
592                })?;
593
594        let tracing_context = TracingContext::from_current_span();
595        let batch_size = put_requests.len();
596        let span = tracing_context.attach(info_span!(
597            "RegionServer::handle_metric_batch_puts",
598            batch_size = batch_size,
599        ));
600        let result = metric_engine
601            .put_regions_batch(put_requests)
602            .trace(span)
603            .await
604            .map_err(BoxedError::new)
605            .context(HandleRegionRequestSnafu {
606                region_id: first_region_id,
607            });
608
609        match result {
610            Ok(total_affected) => {
611                crate::metrics::REGION_CHANGED_ROW_COUNT
612                    .with_label_values(&[request_type])
613                    .inc_by(total_affected as u64);
614                Ok(Either::Left(RegionResponse::new(total_affected)))
615            }
616            Err(err) => {
617                crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
618                    .with_label_values(&[request_type])
619                    .inc_by(batch_size as u64);
620                Err(err)
621            }
622        }
623    }
624
625    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
626        let region_id = RegionId::from_u64(request.region_id);
627        let manifest_info = request
628            .manifest_info
629            .context(error::MissingRequiredFieldSnafu {
630                name: "manifest_info",
631            })?;
632
633        let manifest_info = match manifest_info {
634            ManifestInfo::MitoManifestInfo(info) => {
635                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
636            }
637            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
638                info.data_manifest_version,
639                0,
640                info.metadata_manifest_version,
641                0,
642            ),
643        };
644
645        let tracing_context = TracingContext::from_current_span();
646        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
647
648        self.sync_region(
649            region_id,
650            SyncRegionFromRequest::from_manifest(manifest_info),
651        )
652        .trace(span)
653        .await
654        .map(|_| RegionResponse::new(AffectedRows::default()))
655    }
656
657    /// Handles the ListMetadata request and retrieves metadata for specified regions.
658    ///
659    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
660    /// non-existing regions as `null`.
661    #[tracing::instrument(skip_all)]
662    async fn handle_list_metadata_request(
663        &self,
664        request: &ListMetadataRequest,
665    ) -> Result<RegionResponse> {
666        let mut region_metadatas = Vec::new();
667        // Collect metadata for each region
668        for region_id in &request.region_ids {
669            let region_id = RegionId::from_u64(*region_id);
670            // Get the engine.
671            let Some(engine) = self.find_engine(region_id)? else {
672                region_metadatas.push(None);
673                continue;
674            };
675
676            match engine.get_metadata(region_id).await {
677                Ok(metadata) => region_metadatas.push(Some(metadata)),
678                Err(err) => {
679                    if err.status_code() == StatusCode::RegionNotFound {
680                        region_metadatas.push(None);
681                    } else {
682                        Err(err).with_context(|_| GetRegionMetadataSnafu {
683                            engine: engine.name(),
684                            region_id,
685                        })?;
686                    }
687                }
688            }
689        }
690
691        // Serialize metadata to JSON
692        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
693
694        let response = RegionResponse::from_metadata(json_result);
695
696        Ok(response)
697    }
698
699    /// Sync region manifest and registers new opened logical regions.
700    pub async fn sync_region(
701        &self,
702        region_id: RegionId,
703        request: SyncRegionFromRequest,
704    ) -> Result<()> {
705        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
706            CurrentEngine::Engine(engine) => engine,
707            _ => {
708                return UnexpectedSnafu {
709                    violated: "unexpected EarlyReturn engine status for a ready region",
710                }
711                .fail();
712            }
713        };
714
715        self.inner
716            .handle_sync_region(&engine, region_id, request)
717            .await
718    }
719
720    /// Remaps manifests from old regions to new regions.
721    pub async fn remap_manifests(
722        &self,
723        request: RemapManifestsRequest,
724    ) -> Result<RemapManifestsResponse> {
725        let region_id = request.region_id;
726        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
727            CurrentEngine::Engine(engine) => engine,
728            _ => {
729                return UnexpectedSnafu {
730                    violated: "unexpected EarlyReturn engine status for a ready region",
731                }
732                .fail();
733            }
734        };
735
736        engine
737            .remap_manifests(request)
738            .await
739            .with_context(|_| HandleRegionRequestSnafu { region_id })
740    }
741
742    fn is_suspended(&self) -> bool {
743        self.suspend.load(Ordering::Relaxed)
744    }
745
746    pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
747        self.suspend.clone()
748    }
749}
750
751#[async_trait]
752impl RegionServerHandler for RegionServer {
753    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
754        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
755            .with_label_values(&[request.as_ref()]);
756        let response = match &request {
757            region_request::Body::Creates(_)
758            | region_request::Body::Drops(_)
759            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
760            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
761                self.handle_requests_in_parallel(request).await
762            }
763            region_request::Body::Sync(sync_request) => {
764                self.handle_sync_region_request(sync_request).await
765            }
766            region_request::Body::ListMetadata(list_metadata_request) => {
767                self.handle_list_metadata_request(list_metadata_request)
768                    .await
769            }
770            _ => self.handle_requests_in_serial(request).await,
771        }
772        .map_err(BoxedError::new)
773        .inspect_err(|_| {
774            failed_requests_cnt.inc();
775        })
776        .context(ExecuteGrpcRequestSnafu)?;
777
778        Ok(RegionResponseV1 {
779            header: Some(ResponseHeader {
780                status: Some(Status {
781                    status_code: StatusCode::Success as _,
782                    ..Default::default()
783                }),
784            }),
785            affected_rows: response.affected_rows as _,
786            extensions: response.extensions,
787            metadata: response.metadata,
788        })
789    }
790}
791
792#[async_trait]
793impl FlightCraft for RegionServer {
794    async fn do_get(
795        &self,
796        request: Request<Ticket>,
797    ) -> TonicResult<Response<TonicStream<FlightData>>> {
798        ensure!(!self.is_suspended(), SuspendedSnafu);
799
800        let ticket = request.into_inner().ticket;
801        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
802            .context(servers_error::InvalidFlightTicketSnafu)?;
803        let tracing_context = request
804            .header
805            .as_ref()
806            .map(|h| TracingContext::from_w3c(&h.tracing_context))
807            .unwrap_or_default();
808        let query_ctx = request
809            .header
810            .as_ref()
811            .map(|h| Arc::new(QueryContext::from(h)))
812            .unwrap_or(QueryContext::arc());
813
814        let result = self
815            .handle_remote_read(request, query_ctx.clone())
816            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
817            .await?;
818
819        let stream = Box::pin(FlightRecordBatchStream::new(
820            result,
821            tracing_context,
822            self.flight_compression,
823            query_ctx,
824        ));
825        Ok(Response::new(stream))
826    }
827}
828
829#[derive(Clone)]
830enum RegionEngineWithStatus {
831    // An opening, or creating region.
832    Registering(RegionEngineRef),
833    // A closing, or dropping region.
834    Deregistering(RegionEngineRef),
835    // A ready region.
836    Ready(RegionEngineRef),
837}
838
839impl RegionEngineWithStatus {
840    /// Returns [RegionEngineRef].
841    pub fn into_engine(self) -> RegionEngineRef {
842        match self {
843            RegionEngineWithStatus::Registering(engine) => engine,
844            RegionEngineWithStatus::Deregistering(engine) => engine,
845            RegionEngineWithStatus::Ready(engine) => engine,
846        }
847    }
848}
849
850impl Deref for RegionEngineWithStatus {
851    type Target = RegionEngineRef;
852
853    fn deref(&self) -> &Self::Target {
854        match self {
855            RegionEngineWithStatus::Registering(engine) => engine,
856            RegionEngineWithStatus::Deregistering(engine) => engine,
857            RegionEngineWithStatus::Ready(engine) => engine,
858        }
859    }
860}
861
862struct RegionServerInner {
863    engines: RwLock<HashMap<String, RegionEngineRef>>,
864    region_map: DashMap<RegionId, RegionEngineWithStatus>,
865    query_engine: QueryEngineRef,
866    runtime: Runtime,
867    event_listener: RegionServerEventListenerRef,
868    table_provider_factory: TableProviderFactoryRef,
869    /// The number of queries allowed to be executed at the same time.
870    /// Act as last line of defense on datanode to prevent query overloading.
871    parallelism: Option<RegionServerParallelism>,
872    /// The topic stats reporter.
873    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
874    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
875    /// server with a concrete engine; acceptable for now to fetch Mito-specific
876    /// info (e.g., list SSTs). Consider a diagnostics trait later.
877    mito_engine: RwLock<Option<MitoEngine>>,
878}
879
880struct RegionServerParallelism {
881    semaphore: Semaphore,
882    timeout: Duration,
883}
884
885impl RegionServerParallelism {
886    pub fn from_opts(
887        max_concurrent_queries: usize,
888        concurrent_query_limiter_timeout: Duration,
889    ) -> Option<Self> {
890        if max_concurrent_queries == 0 {
891            return None;
892        }
893        Some(RegionServerParallelism {
894            semaphore: Semaphore::new(max_concurrent_queries),
895            timeout: concurrent_query_limiter_timeout,
896        })
897    }
898
899    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
900        timeout(self.timeout, self.semaphore.acquire())
901            .await
902            .context(ConcurrentQueryLimiterTimeoutSnafu)?
903            .context(ConcurrentQueryLimiterClosedSnafu)
904    }
905}
906
907enum CurrentEngine {
908    Engine(RegionEngineRef),
909    EarlyReturn(AffectedRows),
910}
911
912impl Debug for CurrentEngine {
913    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914        match self {
915            CurrentEngine::Engine(engine) => f
916                .debug_struct("CurrentEngine")
917                .field("engine", &engine.name())
918                .finish(),
919            CurrentEngine::EarlyReturn(rows) => f
920                .debug_struct("CurrentEngine")
921                .field("return", rows)
922                .finish(),
923        }
924    }
925}
926
927impl RegionServerInner {
928    pub fn new(
929        query_engine: QueryEngineRef,
930        runtime: Runtime,
931        event_listener: RegionServerEventListenerRef,
932        table_provider_factory: TableProviderFactoryRef,
933        parallelism: Option<RegionServerParallelism>,
934    ) -> Self {
935        Self {
936            engines: RwLock::new(HashMap::new()),
937            region_map: DashMap::new(),
938            query_engine,
939            runtime,
940            event_listener,
941            table_provider_factory,
942            parallelism,
943            topic_stats_reporter: RwLock::new(None),
944            mito_engine: RwLock::new(None),
945        }
946    }
947
948    pub fn register_engine(&self, engine: RegionEngineRef) {
949        let engine_name = engine.name();
950        if engine_name == MITO_ENGINE_NAME
951            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
952        {
953            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
954        }
955
956        info!("Region Engine {engine_name} is registered");
957        self.engines
958            .write()
959            .unwrap()
960            .insert(engine_name.to_string(), engine);
961    }
962
963    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
964        info!("Set topic stats reporter");
965        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
966    }
967
968    fn get_engine(
969        &self,
970        region_id: RegionId,
971        region_change: &RegionChange,
972    ) -> Result<CurrentEngine> {
973        let current_region_status = self.region_map.get(&region_id);
974
975        let engine = match region_change {
976            RegionChange::Register(attribute) => match current_region_status {
977                Some(status) => match status.clone() {
978                    RegionEngineWithStatus::Registering(engine) => engine,
979                    RegionEngineWithStatus::Deregistering(_) => {
980                        return error::RegionBusySnafu { region_id }.fail();
981                    }
982                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
983                },
984                _ => self
985                    .engines
986                    .read()
987                    .unwrap()
988                    .get(attribute.engine())
989                    .with_context(|| RegionEngineNotFoundSnafu {
990                        name: attribute.engine(),
991                    })?
992                    .clone(),
993            },
994            RegionChange::Deregisters => match current_region_status {
995                Some(status) => match status.clone() {
996                    RegionEngineWithStatus::Registering(_) => {
997                        return error::RegionBusySnafu { region_id }.fail();
998                    }
999                    RegionEngineWithStatus::Deregistering(_) => {
1000                        return Ok(CurrentEngine::EarlyReturn(0));
1001                    }
1002                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1003                },
1004                None => return Ok(CurrentEngine::EarlyReturn(0)),
1005            },
1006            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1007                match current_region_status {
1008                    Some(status) => match status.clone() {
1009                        RegionEngineWithStatus::Registering(_) => {
1010                            return error::RegionNotReadySnafu { region_id }.fail();
1011                        }
1012                        RegionEngineWithStatus::Deregistering(_) => {
1013                            return error::RegionNotFoundSnafu { region_id }.fail();
1014                        }
1015                        RegionEngineWithStatus::Ready(engine) => engine,
1016                    },
1017                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
1018                }
1019            }
1020        };
1021
1022        Ok(CurrentEngine::Engine(engine))
1023    }
1024
1025    async fn handle_batch_open_requests_inner(
1026        &self,
1027        engine: RegionEngineRef,
1028        parallelism: usize,
1029        requests: Vec<(RegionId, RegionOpenRequest)>,
1030        ignore_nonexistent_region: bool,
1031    ) -> Result<Vec<RegionId>> {
1032        let region_changes = requests
1033            .iter()
1034            .map(|(region_id, open)| {
1035                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1036                Ok((*region_id, RegionChange::Register(attribute)))
1037            })
1038            .collect::<Result<HashMap<_, _>>>()?;
1039
1040        for (&region_id, region_change) in &region_changes {
1041            self.set_region_status_not_ready(region_id, &engine, region_change)
1042        }
1043
1044        let mut open_regions = Vec::with_capacity(requests.len());
1045        let mut errors = vec![];
1046        match engine
1047            .handle_batch_open_requests(parallelism, requests)
1048            .await
1049            .with_context(|_| HandleBatchOpenRequestSnafu)
1050        {
1051            Ok(results) => {
1052                for (region_id, result) in results {
1053                    let region_change = &region_changes[&region_id];
1054                    match result {
1055                        Ok(_) => {
1056                            if let Err(e) = self
1057                                .set_region_status_ready(region_id, engine.clone(), *region_change)
1058                                .await
1059                            {
1060                                error!(e; "Failed to set region to ready: {}", region_id);
1061                                errors.push(BoxedError::new(e));
1062                            } else {
1063                                open_regions.push(region_id)
1064                            }
1065                        }
1066                        Err(e) => {
1067                            self.unset_region_status(region_id, &engine, *region_change);
1068                            if e.status_code() == StatusCode::RegionNotFound
1069                                && ignore_nonexistent_region
1070                            {
1071                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1072                            } else {
1073                                error!(e; "Failed to open region: {}", region_id);
1074                                errors.push(e);
1075                            }
1076                        }
1077                    }
1078                }
1079            }
1080            Err(e) => {
1081                for (&region_id, region_change) in &region_changes {
1082                    self.unset_region_status(region_id, &engine, *region_change);
1083                }
1084                error!(e; "Failed to open batch regions");
1085                errors.push(BoxedError::new(e));
1086            }
1087        }
1088
1089        if !errors.is_empty() {
1090            return error::UnexpectedSnafu {
1091                // Returns the first error.
1092                violated: format!("Failed to open batch regions: {:?}", errors[0]),
1093            }
1094            .fail();
1095        }
1096
1097        Ok(open_regions)
1098    }
1099
1100    pub async fn handle_batch_open_requests(
1101        &self,
1102        parallelism: usize,
1103        requests: Vec<(RegionId, RegionOpenRequest)>,
1104        ignore_nonexistent_region: bool,
1105    ) -> Result<Vec<RegionId>> {
1106        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1107            HashMap::with_capacity(requests.len());
1108        for (region_id, request) in requests {
1109            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1110                requests.push((region_id, request));
1111            } else {
1112                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1113            }
1114        }
1115
1116        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1117        for (engine, requests) in engine_grouped_requests {
1118            let engine = self
1119                .engines
1120                .read()
1121                .unwrap()
1122                .get(&engine)
1123                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1124                .clone();
1125            results.push(
1126                self.handle_batch_open_requests_inner(
1127                    engine,
1128                    parallelism,
1129                    requests,
1130                    ignore_nonexistent_region,
1131                )
1132                .await,
1133            )
1134        }
1135
1136        Ok(results
1137            .into_iter()
1138            .collect::<Result<Vec<_>>>()?
1139            .into_iter()
1140            .flatten()
1141            .collect::<Vec<_>>())
1142    }
1143
1144    pub async fn handle_batch_catchup_requests_inner(
1145        &self,
1146        engine: RegionEngineRef,
1147        parallelism: usize,
1148        requests: Vec<(RegionId, RegionCatchupRequest)>,
1149    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1150        for (region_id, _) in &requests {
1151            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1152        }
1153        let region_ids = requests
1154            .iter()
1155            .map(|(region_id, _)| *region_id)
1156            .collect::<Vec<_>>();
1157        let mut responses = Vec::with_capacity(requests.len());
1158        match engine
1159            .handle_batch_catchup_requests(parallelism, requests)
1160            .await
1161        {
1162            Ok(results) => {
1163                for (region_id, result) in results {
1164                    match result {
1165                        Ok(_) => {
1166                            if let Err(e) = self
1167                                .set_region_status_ready(
1168                                    region_id,
1169                                    engine.clone(),
1170                                    RegionChange::Catchup,
1171                                )
1172                                .await
1173                            {
1174                                error!(e; "Failed to set region to ready: {}", region_id);
1175                                responses.push((region_id, Err(BoxedError::new(e))));
1176                            } else {
1177                                responses.push((region_id, Ok(())));
1178                            }
1179                        }
1180                        Err(e) => {
1181                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1182                            error!(e; "Failed to catchup region: {}", region_id);
1183                            responses.push((region_id, Err(e)));
1184                        }
1185                    }
1186                }
1187            }
1188            Err(e) => {
1189                for region_id in region_ids {
1190                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1191                }
1192                error!(e; "Failed to catchup batch regions");
1193                return error::UnexpectedSnafu {
1194                    violated: format!("Failed to catchup batch regions: {:?}", e),
1195                }
1196                .fail();
1197            }
1198        }
1199
1200        Ok(responses)
1201    }
1202
1203    pub async fn handle_batch_catchup_requests(
1204        &self,
1205        parallelism: usize,
1206        requests: Vec<(RegionId, RegionCatchupRequest)>,
1207    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1208        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1209
1210        let mut responses = Vec::with_capacity(requests.len());
1211        for (region_id, request) in requests {
1212            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1213                match engine {
1214                    CurrentEngine::Engine(engine) => {
1215                        engine_grouped_requests
1216                            .entry(engine.name().to_string())
1217                            .or_default()
1218                            .push((region_id, request));
1219                    }
1220                    CurrentEngine::EarlyReturn(_) => {
1221                        return error::UnexpectedSnafu {
1222                            violated: format!("Unexpected engine type for region {}", region_id),
1223                        }
1224                        .fail();
1225                    }
1226                }
1227            } else {
1228                responses.push((
1229                    region_id,
1230                    Err(BoxedError::new(
1231                        error::RegionNotFoundSnafu { region_id }.build(),
1232                    )),
1233                ));
1234            }
1235        }
1236
1237        for (engine, requests) in engine_grouped_requests {
1238            let engine = self
1239                .engines
1240                .read()
1241                .unwrap()
1242                .get(&engine)
1243                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1244                .clone();
1245            responses.extend(
1246                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1247                    .await?,
1248            );
1249        }
1250
1251        Ok(responses)
1252    }
1253
1254    // Handle requests in batch.
1255    //
1256    // limitation: all create requests must be in the same engine.
1257    pub async fn handle_batch_request(
1258        &self,
1259        batch_request: BatchRegionDdlRequest,
1260    ) -> Result<RegionResponse> {
1261        let region_changes = match &batch_request {
1262            BatchRegionDdlRequest::Create(requests) => requests
1263                .iter()
1264                .map(|(region_id, create)| {
1265                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1266                    Ok((*region_id, RegionChange::Register(attribute)))
1267                })
1268                .collect::<Result<Vec<_>>>()?,
1269            BatchRegionDdlRequest::Drop(requests) => requests
1270                .iter()
1271                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1272                .collect::<Vec<_>>(),
1273            BatchRegionDdlRequest::Alter(requests) => requests
1274                .iter()
1275                .map(|(region_id, _)| (*region_id, RegionChange::None))
1276                .collect::<Vec<_>>(),
1277        };
1278
1279        // The ddl procedure will ensure all requests are in the same engine.
1280        // Therefore, we can get the engine from the first request.
1281        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1282        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1283            CurrentEngine::Engine(engine) => engine,
1284            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1285        };
1286
1287        for (region_id, region_change) in region_changes.iter() {
1288            self.set_region_status_not_ready(*region_id, &engine, region_change);
1289        }
1290
1291        let ddl_type = batch_request.request_type();
1292        let result = engine
1293            .handle_batch_ddl_requests(batch_request)
1294            .await
1295            .context(HandleBatchDdlRequestSnafu { ddl_type });
1296
1297        match result {
1298            Ok(result) => {
1299                for (region_id, region_change) in &region_changes {
1300                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1301                        .await?;
1302                }
1303
1304                Ok(RegionResponse {
1305                    affected_rows: result.affected_rows,
1306                    extensions: result.extensions,
1307                    metadata: Vec::new(),
1308                })
1309            }
1310            Err(err) => {
1311                for (region_id, region_change) in region_changes {
1312                    self.unset_region_status(region_id, &engine, region_change);
1313                }
1314
1315                Err(err)
1316            }
1317        }
1318    }
1319
1320    pub async fn handle_request(
1321        &self,
1322        region_id: RegionId,
1323        request: RegionRequest,
1324    ) -> Result<RegionResponse> {
1325        let request_type = request.request_type();
1326        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1327            .with_label_values(&[request_type])
1328            .start_timer();
1329
1330        let region_change = match &request {
1331            RegionRequest::Create(create) => {
1332                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1333                RegionChange::Register(attribute)
1334            }
1335            RegionRequest::Open(open) => {
1336                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1337                RegionChange::Register(attribute)
1338            }
1339            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1340            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1341                RegionChange::Ingest
1342            }
1343            RegionRequest::Alter(_)
1344            | RegionRequest::Flush(_)
1345            | RegionRequest::Compact(_)
1346            | RegionRequest::Truncate(_)
1347            | RegionRequest::BuildIndex(_)
1348            | RegionRequest::EnterStaging(_)
1349            | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1350            RegionRequest::Catchup(_) => RegionChange::Catchup,
1351        };
1352
1353        let engine = match self.get_engine(region_id, &region_change)? {
1354            CurrentEngine::Engine(engine) => engine,
1355            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1356        };
1357
1358        // Sets corresponding region status to registering/deregistering before the operation.
1359        self.set_region_status_not_ready(region_id, &engine, &region_change);
1360
1361        match engine
1362            .handle_request(region_id, request)
1363            .await
1364            .with_context(|_| HandleRegionRequestSnafu { region_id })
1365        {
1366            Ok(result) => {
1367                // Update metrics
1368                if matches!(region_change, RegionChange::Ingest) {
1369                    crate::metrics::REGION_CHANGED_ROW_COUNT
1370                        .with_label_values(&[request_type])
1371                        .inc_by(result.affected_rows as u64);
1372                }
1373                // Sets corresponding region status to ready.
1374                self.set_region_status_ready(region_id, engine.clone(), region_change)
1375                    .await?;
1376
1377                Ok(RegionResponse {
1378                    affected_rows: result.affected_rows,
1379                    extensions: result.extensions,
1380                    metadata: Vec::new(),
1381                })
1382            }
1383            Err(err) => {
1384                if matches!(region_change, RegionChange::Ingest) {
1385                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1386                        .with_label_values(&[request_type])
1387                        .inc();
1388                }
1389                // Removes the region status if the operation fails.
1390                self.unset_region_status(region_id, &engine, region_change);
1391                Err(err)
1392            }
1393        }
1394    }
1395
1396    /// Handles the sync region request.
1397    pub async fn handle_sync_region(
1398        &self,
1399        engine: &RegionEngineRef,
1400        region_id: RegionId,
1401        request: SyncRegionFromRequest,
1402    ) -> Result<()> {
1403        let Some(new_opened_regions) = engine
1404            .sync_region(region_id, request)
1405            .await
1406            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1407            .new_opened_logical_region_ids()
1408        else {
1409            return Ok(());
1410        };
1411
1412        for region in new_opened_regions {
1413            self.region_map
1414                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1415            info!("Logical region {} is registered!", region);
1416        }
1417
1418        Ok(())
1419    }
1420
1421    fn set_region_status_not_ready(
1422        &self,
1423        region_id: RegionId,
1424        engine: &RegionEngineRef,
1425        region_change: &RegionChange,
1426    ) {
1427        match region_change {
1428            RegionChange::Register(_) => {
1429                self.region_map.insert(
1430                    region_id,
1431                    RegionEngineWithStatus::Registering(engine.clone()),
1432                );
1433            }
1434            RegionChange::Deregisters => {
1435                self.region_map.insert(
1436                    region_id,
1437                    RegionEngineWithStatus::Deregistering(engine.clone()),
1438                );
1439            }
1440            _ => {}
1441        }
1442    }
1443
1444    fn unset_region_status(
1445        &self,
1446        region_id: RegionId,
1447        engine: &RegionEngineRef,
1448        region_change: RegionChange,
1449    ) {
1450        match region_change {
1451            RegionChange::None | RegionChange::Ingest => {}
1452            RegionChange::Register(_) => {
1453                self.region_map.remove(&region_id);
1454            }
1455            RegionChange::Deregisters => {
1456                self.region_map
1457                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1458            }
1459            RegionChange::Catchup => {}
1460        }
1461    }
1462
1463    async fn set_region_status_ready(
1464        &self,
1465        region_id: RegionId,
1466        engine: RegionEngineRef,
1467        region_change: RegionChange,
1468    ) -> Result<()> {
1469        let engine_type = engine.name();
1470        match region_change {
1471            RegionChange::None | RegionChange::Ingest => {}
1472            RegionChange::Register(attribute) => {
1473                info!(
1474                    "Region {region_id} is registered to engine {}",
1475                    attribute.engine()
1476                );
1477                self.region_map
1478                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1479
1480                match attribute {
1481                    RegionAttribute::Metric { physical } => {
1482                        if physical {
1483                            // Registers the logical regions belong to the physical region (`region_id`).
1484                            self.register_logical_regions(&engine, region_id).await?;
1485                            // We only send the `on_region_registered` event of the physical region.
1486                            self.event_listener.on_region_registered(region_id);
1487                        }
1488                    }
1489                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1490                    RegionAttribute::File => {
1491                        // do nothing
1492                    }
1493                }
1494            }
1495            RegionChange::Deregisters => {
1496                info!("Region {region_id} is deregistered from engine {engine_type}");
1497                self.region_map
1498                    .remove(&region_id)
1499                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1500                self.event_listener.on_region_deregistered(region_id);
1501            }
1502            RegionChange::Catchup => {
1503                if is_metric_engine(engine.name()) {
1504                    // Registers the logical regions belong to the physical region (`region_id`).
1505                    self.register_logical_regions(&engine, region_id).await?;
1506                }
1507            }
1508        }
1509        Ok(())
1510    }
1511
1512    async fn register_logical_regions(
1513        &self,
1514        engine: &RegionEngineRef,
1515        physical_region_id: RegionId,
1516    ) -> Result<()> {
1517        let metric_engine =
1518            engine
1519                .as_any()
1520                .downcast_ref::<MetricEngine>()
1521                .context(UnexpectedSnafu {
1522                    violated: format!(
1523                        "expecting engine type '{}', actual '{}'",
1524                        METRIC_ENGINE_NAME,
1525                        engine.name(),
1526                    ),
1527                })?;
1528
1529        let logical_regions = metric_engine
1530            .logical_regions(physical_region_id)
1531            .await
1532            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1533
1534        for region in logical_regions {
1535            self.region_map
1536                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1537            info!("Logical region {} is registered!", region);
1538        }
1539        Ok(())
1540    }
1541
1542    pub async fn handle_read(
1543        &self,
1544        request: QueryRequest,
1545        query_ctx: QueryContextRef,
1546    ) -> Result<SendableRecordBatchStream> {
1547        // TODO(ruihang): add metrics and set trace id
1548
1549        let result = self
1550            .query_engine
1551            .execute(request.plan, query_ctx)
1552            .await
1553            .context(ExecuteLogicalPlanSnafu)?;
1554
1555        match result.data {
1556            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1557                UnsupportedOutputSnafu { expected: "stream" }.fail()
1558            }
1559            OutputData::Stream(stream) => Ok(stream),
1560        }
1561    }
1562
1563    async fn stop(&self) -> Result<()> {
1564        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1565        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1566        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1567        //
1568        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1569        // it here, collect the values first then use later separately.
1570
1571        let regions = self
1572            .region_map
1573            .iter()
1574            .map(|x| (*x.key(), x.value().clone()))
1575            .collect::<Vec<_>>();
1576        let num_regions = regions.len();
1577
1578        for (region_id, engine) in regions {
1579            let closed = engine
1580                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1581                .await;
1582            match closed {
1583                Ok(_) => debug!("Region {region_id} is closed"),
1584                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1585            }
1586        }
1587        self.region_map.clear();
1588        info!("closed {num_regions} regions");
1589
1590        drop(self.mito_engine.write().unwrap().take());
1591        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1592        for (engine_name, engine) in engines {
1593            engine
1594                .stop()
1595                .await
1596                .context(StopRegionEngineSnafu { name: &engine_name })?;
1597            info!("Region engine {engine_name} is stopped");
1598        }
1599
1600        Ok(())
1601    }
1602}
1603
1604#[derive(Debug, Clone, Copy)]
1605enum RegionChange {
1606    None,
1607    Register(RegionAttribute),
1608    Deregisters,
1609    Catchup,
1610    Ingest,
1611}
1612
1613fn is_metric_engine(engine: &str) -> bool {
1614    engine == METRIC_ENGINE_NAME
1615}
1616
1617fn parse_region_attribute(
1618    engine: &str,
1619    options: &HashMap<String, String>,
1620) -> Result<RegionAttribute> {
1621    match engine {
1622        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1623        METRIC_ENGINE_NAME => {
1624            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1625
1626            Ok(RegionAttribute::Metric { physical })
1627        }
1628        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1629        _ => error::UnexpectedSnafu {
1630            violated: format!("Unknown engine: {}", engine),
1631        }
1632        .fail(),
1633    }
1634}
1635
1636#[derive(Debug, Clone, Copy)]
1637enum RegionAttribute {
1638    Mito,
1639    Metric { physical: bool },
1640    File,
1641}
1642
1643impl RegionAttribute {
1644    fn engine(&self) -> &'static str {
1645        match self {
1646            RegionAttribute::Mito => MITO_ENGINE_NAME,
1647            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1648            RegionAttribute::File => FILE_ENGINE_NAME,
1649        }
1650    }
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655
1656    use std::assert_matches::assert_matches;
1657
1658    use api::v1::SemanticType;
1659    use common_error::ext::ErrorExt;
1660    use datatypes::prelude::ConcreteDataType;
1661    use mito2::test_util::CreateRequestBuilder;
1662    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1663    use store_api::region_engine::RegionEngine;
1664    use store_api::region_request::{
1665        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1666    };
1667    use store_api::storage::RegionId;
1668
1669    use super::*;
1670    use crate::error::Result;
1671    use crate::tests::{MockRegionEngine, mock_region_server};
1672
1673    #[tokio::test]
1674    async fn test_region_registering() {
1675        common_telemetry::init_default_ut_logging();
1676
1677        let mut mock_region_server = mock_region_server();
1678        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1679        let engine_name = engine.name();
1680        mock_region_server.register_engine(engine.clone());
1681        let region_id = RegionId::new(1, 1);
1682        let builder = CreateRequestBuilder::new();
1683        let create_req = builder.build();
1684        // Tries to create/open a registering region.
1685        mock_region_server.inner.region_map.insert(
1686            region_id,
1687            RegionEngineWithStatus::Registering(engine.clone()),
1688        );
1689        let response = mock_region_server
1690            .handle_request(region_id, RegionRequest::Create(create_req))
1691            .await
1692            .unwrap();
1693        assert_eq!(response.affected_rows, 0);
1694        let status = mock_region_server
1695            .inner
1696            .region_map
1697            .get(&region_id)
1698            .unwrap()
1699            .clone();
1700        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1701
1702        mock_region_server.inner.region_map.insert(
1703            region_id,
1704            RegionEngineWithStatus::Registering(engine.clone()),
1705        );
1706        let response = mock_region_server
1707            .handle_request(
1708                region_id,
1709                RegionRequest::Open(RegionOpenRequest {
1710                    engine: engine_name.to_string(),
1711                    table_dir: String::new(),
1712                    path_type: PathType::Bare,
1713                    options: Default::default(),
1714                    skip_wal_replay: false,
1715                    checkpoint: None,
1716                }),
1717            )
1718            .await
1719            .unwrap();
1720        assert_eq!(response.affected_rows, 0);
1721        let status = mock_region_server
1722            .inner
1723            .region_map
1724            .get(&region_id)
1725            .unwrap()
1726            .clone();
1727        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1728    }
1729
1730    #[tokio::test]
1731    async fn test_region_deregistering() {
1732        common_telemetry::init_default_ut_logging();
1733
1734        let mut mock_region_server = mock_region_server();
1735        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1736
1737        mock_region_server.register_engine(engine.clone());
1738
1739        let region_id = RegionId::new(1, 1);
1740
1741        // Tries to drop/close a registering region.
1742        mock_region_server.inner.region_map.insert(
1743            region_id,
1744            RegionEngineWithStatus::Deregistering(engine.clone()),
1745        );
1746
1747        let response = mock_region_server
1748            .handle_request(
1749                region_id,
1750                RegionRequest::Drop(RegionDropRequest {
1751                    fast_path: false,
1752                    force: false,
1753                    partial_drop: false,
1754                }),
1755            )
1756            .await
1757            .unwrap();
1758        assert_eq!(response.affected_rows, 0);
1759
1760        let status = mock_region_server
1761            .inner
1762            .region_map
1763            .get(&region_id)
1764            .unwrap()
1765            .clone();
1766        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1767
1768        mock_region_server.inner.region_map.insert(
1769            region_id,
1770            RegionEngineWithStatus::Deregistering(engine.clone()),
1771        );
1772
1773        let response = mock_region_server
1774            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1775            .await
1776            .unwrap();
1777        assert_eq!(response.affected_rows, 0);
1778
1779        let status = mock_region_server
1780            .inner
1781            .region_map
1782            .get(&region_id)
1783            .unwrap()
1784            .clone();
1785        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1786    }
1787
1788    #[tokio::test]
1789    async fn test_region_not_ready() {
1790        common_telemetry::init_default_ut_logging();
1791
1792        let mut mock_region_server = mock_region_server();
1793        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1794
1795        mock_region_server.register_engine(engine.clone());
1796
1797        let region_id = RegionId::new(1, 1);
1798
1799        // Tries to drop/close a registering region.
1800        mock_region_server.inner.region_map.insert(
1801            region_id,
1802            RegionEngineWithStatus::Registering(engine.clone()),
1803        );
1804
1805        let err = mock_region_server
1806            .handle_request(
1807                region_id,
1808                RegionRequest::Truncate(RegionTruncateRequest::All),
1809            )
1810            .await
1811            .unwrap_err();
1812
1813        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1814    }
1815
1816    #[tokio::test]
1817    async fn test_region_request_failed() {
1818        common_telemetry::init_default_ut_logging();
1819
1820        let mut mock_region_server = mock_region_server();
1821        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1822            MITO_ENGINE_NAME,
1823            Box::new(|_region_id, _request| {
1824                error::UnexpectedSnafu {
1825                    violated: "test".to_string(),
1826                }
1827                .fail()
1828            }),
1829        );
1830
1831        mock_region_server.register_engine(engine.clone());
1832
1833        let region_id = RegionId::new(1, 1);
1834        let builder = CreateRequestBuilder::new();
1835        let create_req = builder.build();
1836        mock_region_server
1837            .handle_request(region_id, RegionRequest::Create(create_req))
1838            .await
1839            .unwrap_err();
1840
1841        let status = mock_region_server.inner.region_map.get(&region_id);
1842        assert!(status.is_none());
1843
1844        mock_region_server
1845            .inner
1846            .region_map
1847            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1848
1849        mock_region_server
1850            .handle_request(
1851                region_id,
1852                RegionRequest::Drop(RegionDropRequest {
1853                    fast_path: false,
1854                    force: false,
1855                    partial_drop: false,
1856                }),
1857            )
1858            .await
1859            .unwrap_err();
1860
1861        let status = mock_region_server.inner.region_map.get(&region_id);
1862        assert!(status.is_some());
1863    }
1864
1865    #[tokio::test]
1866    async fn test_batch_open_region_ignore_nonexistent_regions() {
1867        common_telemetry::init_default_ut_logging();
1868        let mut mock_region_server = mock_region_server();
1869        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1870            MITO_ENGINE_NAME,
1871            Box::new(|region_id, _request| {
1872                if region_id == RegionId::new(1, 1) {
1873                    error::RegionNotFoundSnafu { region_id }.fail()
1874                } else {
1875                    Ok(0)
1876                }
1877            }),
1878        );
1879        mock_region_server.register_engine(engine.clone());
1880
1881        let region_ids = mock_region_server
1882            .handle_batch_open_requests(
1883                8,
1884                vec![
1885                    (
1886                        RegionId::new(1, 1),
1887                        RegionOpenRequest {
1888                            engine: MITO_ENGINE_NAME.to_string(),
1889                            table_dir: String::new(),
1890                            path_type: PathType::Bare,
1891                            options: Default::default(),
1892                            skip_wal_replay: false,
1893                            checkpoint: None,
1894                        },
1895                    ),
1896                    (
1897                        RegionId::new(1, 2),
1898                        RegionOpenRequest {
1899                            engine: MITO_ENGINE_NAME.to_string(),
1900                            table_dir: String::new(),
1901                            path_type: PathType::Bare,
1902                            options: Default::default(),
1903                            skip_wal_replay: false,
1904                            checkpoint: None,
1905                        },
1906                    ),
1907                ],
1908                true,
1909            )
1910            .await
1911            .unwrap();
1912        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1913
1914        let err = mock_region_server
1915            .handle_batch_open_requests(
1916                8,
1917                vec![
1918                    (
1919                        RegionId::new(1, 1),
1920                        RegionOpenRequest {
1921                            engine: MITO_ENGINE_NAME.to_string(),
1922                            table_dir: String::new(),
1923                            path_type: PathType::Bare,
1924                            options: Default::default(),
1925                            skip_wal_replay: false,
1926                            checkpoint: None,
1927                        },
1928                    ),
1929                    (
1930                        RegionId::new(1, 2),
1931                        RegionOpenRequest {
1932                            engine: MITO_ENGINE_NAME.to_string(),
1933                            table_dir: String::new(),
1934                            path_type: PathType::Bare,
1935                            options: Default::default(),
1936                            skip_wal_replay: false,
1937                            checkpoint: None,
1938                        },
1939                    ),
1940                ],
1941                false,
1942            )
1943            .await
1944            .unwrap_err();
1945        assert_eq!(err.status_code(), StatusCode::Unexpected);
1946    }
1947
1948    struct CurrentEngineTest {
1949        region_id: RegionId,
1950        current_region_status: Option<RegionEngineWithStatus>,
1951        region_change: RegionChange,
1952        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1953    }
1954
1955    #[tokio::test]
1956    async fn test_current_engine() {
1957        common_telemetry::init_default_ut_logging();
1958
1959        let mut mock_region_server = mock_region_server();
1960        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1961        mock_region_server.register_engine(engine.clone());
1962
1963        let region_id = RegionId::new(1024, 1);
1964        let tests = vec![
1965            // RegionChange::None
1966            CurrentEngineTest {
1967                region_id,
1968                current_region_status: None,
1969                region_change: RegionChange::None,
1970                assert: Box::new(|result| {
1971                    let err = result.unwrap_err();
1972                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1973                }),
1974            },
1975            CurrentEngineTest {
1976                region_id,
1977                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1978                region_change: RegionChange::None,
1979                assert: Box::new(|result| {
1980                    let current_engine = result.unwrap();
1981                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1982                }),
1983            },
1984            CurrentEngineTest {
1985                region_id,
1986                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1987                region_change: RegionChange::None,
1988                assert: Box::new(|result| {
1989                    let err = result.unwrap_err();
1990                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1991                }),
1992            },
1993            CurrentEngineTest {
1994                region_id,
1995                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1996                region_change: RegionChange::None,
1997                assert: Box::new(|result| {
1998                    let err = result.unwrap_err();
1999                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2000                }),
2001            },
2002            // RegionChange::Register
2003            CurrentEngineTest {
2004                region_id,
2005                current_region_status: None,
2006                region_change: RegionChange::Register(RegionAttribute::Mito),
2007                assert: Box::new(|result| {
2008                    let current_engine = result.unwrap();
2009                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2010                }),
2011            },
2012            CurrentEngineTest {
2013                region_id,
2014                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2015                region_change: RegionChange::Register(RegionAttribute::Mito),
2016                assert: Box::new(|result| {
2017                    let current_engine = result.unwrap();
2018                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2019                }),
2020            },
2021            CurrentEngineTest {
2022                region_id,
2023                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2024                region_change: RegionChange::Register(RegionAttribute::Mito),
2025                assert: Box::new(|result| {
2026                    let err = result.unwrap_err();
2027                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2028                }),
2029            },
2030            CurrentEngineTest {
2031                region_id,
2032                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2033                region_change: RegionChange::Register(RegionAttribute::Mito),
2034                assert: Box::new(|result| {
2035                    let current_engine = result.unwrap();
2036                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2037                }),
2038            },
2039            // RegionChange::Deregister
2040            CurrentEngineTest {
2041                region_id,
2042                current_region_status: None,
2043                region_change: RegionChange::Deregisters,
2044                assert: Box::new(|result| {
2045                    let current_engine = result.unwrap();
2046                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2047                }),
2048            },
2049            CurrentEngineTest {
2050                region_id,
2051                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2052                region_change: RegionChange::Deregisters,
2053                assert: Box::new(|result| {
2054                    let err = result.unwrap_err();
2055                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2056                }),
2057            },
2058            CurrentEngineTest {
2059                region_id,
2060                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2061                region_change: RegionChange::Deregisters,
2062                assert: Box::new(|result| {
2063                    let current_engine = result.unwrap();
2064                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2065                }),
2066            },
2067            CurrentEngineTest {
2068                region_id,
2069                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2070                region_change: RegionChange::Deregisters,
2071                assert: Box::new(|result| {
2072                    let current_engine = result.unwrap();
2073                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2074                }),
2075            },
2076        ];
2077
2078        for test in tests {
2079            let CurrentEngineTest {
2080                region_id,
2081                current_region_status,
2082                region_change,
2083                assert,
2084            } = test;
2085
2086            // Sets up
2087            if let Some(status) = current_region_status {
2088                mock_region_server
2089                    .inner
2090                    .region_map
2091                    .insert(region_id, status);
2092            } else {
2093                mock_region_server.inner.region_map.remove(&region_id);
2094            }
2095
2096            let result = mock_region_server
2097                .inner
2098                .get_engine(region_id, &region_change);
2099
2100            assert(result);
2101        }
2102    }
2103
2104    #[tokio::test]
2105    async fn test_region_server_parallelism() {
2106        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2107        let first_query = p.acquire().await;
2108        assert!(first_query.is_ok());
2109        let second_query = p.acquire().await;
2110        assert!(second_query.is_ok());
2111        let third_query = p.acquire().await;
2112        assert!(third_query.is_err());
2113        let err = third_query.unwrap_err();
2114        assert_eq!(
2115            err.output_msg(),
2116            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2117        );
2118        drop(first_query);
2119        let forth_query = p.acquire().await;
2120        assert!(forth_query.is_ok());
2121    }
2122
2123    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2124        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2125        metadata_builder.push_column_metadata(ColumnMetadata {
2126            column_schema: datatypes::schema::ColumnSchema::new(
2127                "timestamp",
2128                ConcreteDataType::timestamp_nanosecond_datatype(),
2129                false,
2130            ),
2131            semantic_type: SemanticType::Timestamp,
2132            column_id: 0,
2133        });
2134        metadata_builder.push_column_metadata(ColumnMetadata {
2135            column_schema: datatypes::schema::ColumnSchema::new(
2136                "file",
2137                ConcreteDataType::string_datatype(),
2138                true,
2139            ),
2140            semantic_type: SemanticType::Tag,
2141            column_id: 1,
2142        });
2143        metadata_builder.push_column_metadata(ColumnMetadata {
2144            column_schema: datatypes::schema::ColumnSchema::new(
2145                "message",
2146                ConcreteDataType::string_datatype(),
2147                true,
2148            ),
2149            semantic_type: SemanticType::Field,
2150            column_id: 2,
2151        });
2152        metadata_builder.primary_key(vec![1]);
2153        metadata_builder.build().unwrap()
2154    }
2155
2156    #[tokio::test]
2157    async fn test_handle_list_metadata_request() {
2158        common_telemetry::init_default_ut_logging();
2159
2160        let mut mock_region_server = mock_region_server();
2161        let region_id_1 = RegionId::new(1, 0);
2162        let region_id_2 = RegionId::new(2, 0);
2163
2164        let metadata_1 = mock_region_metadata(region_id_1);
2165        let metadata_2 = mock_region_metadata(region_id_2);
2166        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2167
2168        let metadata_1 = Arc::new(metadata_1);
2169        let metadata_2 = Arc::new(metadata_2);
2170        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2171            MITO_ENGINE_NAME,
2172            Box::new(move |region_id| {
2173                if region_id == region_id_1 {
2174                    Ok(metadata_1.clone())
2175                } else if region_id == region_id_2 {
2176                    Ok(metadata_2.clone())
2177                } else {
2178                    error::RegionNotFoundSnafu { region_id }.fail()
2179                }
2180            }),
2181        );
2182
2183        mock_region_server.register_engine(engine.clone());
2184        mock_region_server
2185            .inner
2186            .region_map
2187            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2188        mock_region_server
2189            .inner
2190            .region_map
2191            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2192
2193        // All regions exist.
2194        let list_metadata_request = ListMetadataRequest {
2195            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2196        };
2197        let response = mock_region_server
2198            .handle_list_metadata_request(&list_metadata_request)
2199            .await
2200            .unwrap();
2201        let decoded_metadata: Vec<Option<RegionMetadata>> =
2202            serde_json::from_slice(&response.metadata).unwrap();
2203        assert_eq!(metadatas, decoded_metadata);
2204    }
2205
2206    #[tokio::test]
2207    async fn test_handle_list_metadata_not_found() {
2208        common_telemetry::init_default_ut_logging();
2209
2210        let mut mock_region_server = mock_region_server();
2211        let region_id_1 = RegionId::new(1, 0);
2212        let region_id_2 = RegionId::new(2, 0);
2213
2214        let metadata_1 = mock_region_metadata(region_id_1);
2215        let metadatas = vec![Some(metadata_1.clone()), None];
2216
2217        let metadata_1 = Arc::new(metadata_1);
2218        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2219            MITO_ENGINE_NAME,
2220            Box::new(move |region_id| {
2221                if region_id == region_id_1 {
2222                    Ok(metadata_1.clone())
2223                } else {
2224                    error::RegionNotFoundSnafu { region_id }.fail()
2225                }
2226            }),
2227        );
2228
2229        mock_region_server.register_engine(engine.clone());
2230        mock_region_server
2231            .inner
2232            .region_map
2233            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2234
2235        // Not in region map.
2236        let list_metadata_request = ListMetadataRequest {
2237            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2238        };
2239        let response = mock_region_server
2240            .handle_list_metadata_request(&list_metadata_request)
2241            .await
2242            .unwrap();
2243        let decoded_metadata: Vec<Option<RegionMetadata>> =
2244            serde_json::from_slice(&response.metadata).unwrap();
2245        assert_eq!(metadatas, decoded_metadata);
2246
2247        // Not in region engine.
2248        mock_region_server
2249            .inner
2250            .region_map
2251            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2252        let response = mock_region_server
2253            .handle_list_metadata_request(&list_metadata_request)
2254            .await
2255            .unwrap();
2256        let decoded_metadata: Vec<Option<RegionMetadata>> =
2257            serde_json::from_slice(&response.metadata).unwrap();
2258        assert_eq!(metadatas, decoded_metadata);
2259    }
2260
2261    #[tokio::test]
2262    async fn test_handle_list_metadata_failed() {
2263        common_telemetry::init_default_ut_logging();
2264
2265        let mut mock_region_server = mock_region_server();
2266        let region_id_1 = RegionId::new(1, 0);
2267
2268        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2269            MITO_ENGINE_NAME,
2270            Box::new(move |region_id| {
2271                error::UnexpectedSnafu {
2272                    violated: format!("Failed to get region {region_id}"),
2273                }
2274                .fail()
2275            }),
2276        );
2277
2278        mock_region_server.register_engine(engine.clone());
2279        mock_region_server
2280            .inner
2281            .region_map
2282            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2283
2284        // Failed to get.
2285        let list_metadata_request = ListMetadataRequest {
2286            region_ids: vec![region_id_1.as_u64()],
2287        };
2288        mock_region_server
2289            .handle_list_metadata_request(&list_metadata_request)
2290            .await
2291            .unwrap_err();
2292    }
2293}