Skip to main content

datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use either::Either;
48use futures_util::future::try_join_all;
49use metric_engine::engine::MetricEngine;
50use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
51use prost::Message;
52use query::QueryEngineRef;
53pub use query::dummy_catalog::{
54    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
55};
56use serde_json;
57use servers::error::{
58    self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
59};
60use servers::grpc::FlightCompression;
61use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
62use servers::grpc::region_server::RegionServerHandler;
63use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metric_engine_consts::{
66    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
67};
68use store_api::region_engine::{
69    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
70    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
71    SyncRegionFromRequest,
72};
73use store_api::region_request::{
74    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
75    RegionOpenRequest, RegionRequest,
76};
77use store_api::storage::RegionId;
78use tokio::sync::{Semaphore, SemaphorePermit};
79use tokio::time::timeout;
80use tonic::{Request, Response, Result as TonicResult};
81
82use crate::error::{
83    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
84    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
85    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
86    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
87    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
88    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
89};
90use crate::event_listener::RegionServerEventListenerRef;
91use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
92
93#[derive(Clone)]
94pub struct RegionServer {
95    inner: Arc<RegionServerInner>,
96    flight_compression: FlightCompression,
97    suspend: Arc<AtomicBool>,
98}
99
100pub struct RegionStat {
101    pub region_id: RegionId,
102    pub engine: String,
103    pub role: RegionRole,
104}
105
106impl RegionServer {
107    pub fn new(
108        query_engine: QueryEngineRef,
109        runtime: Runtime,
110        event_listener: RegionServerEventListenerRef,
111        flight_compression: FlightCompression,
112    ) -> Self {
113        Self::with_table_provider(
114            query_engine,
115            runtime,
116            event_listener,
117            Arc::new(DummyTableProviderFactory),
118            0,
119            Duration::from_millis(0),
120            flight_compression,
121        )
122    }
123
124    pub fn with_table_provider(
125        query_engine: QueryEngineRef,
126        runtime: Runtime,
127        event_listener: RegionServerEventListenerRef,
128        table_provider_factory: TableProviderFactoryRef,
129        max_concurrent_queries: usize,
130        concurrent_query_limiter_timeout: Duration,
131        flight_compression: FlightCompression,
132    ) -> Self {
133        Self {
134            inner: Arc::new(RegionServerInner::new(
135                query_engine,
136                runtime,
137                event_listener,
138                table_provider_factory,
139                RegionServerParallelism::from_opts(
140                    max_concurrent_queries,
141                    concurrent_query_limiter_timeout,
142                ),
143            )),
144            flight_compression,
145            suspend: Arc::new(AtomicBool::new(false)),
146        }
147    }
148
149    /// Registers an engine.
150    pub fn register_engine(&mut self, engine: RegionEngineRef) {
151        self.inner.register_engine(engine);
152    }
153
154    /// Sets the topic stats.
155    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
156        self.inner.set_topic_stats_reporter(topic_stats_reporter);
157    }
158
159    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
160    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
161        match self.inner.get_engine(region_id, &RegionChange::None) {
162            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
163            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
164            Err(error::Error::RegionNotFound { .. }) => Ok(None),
165            Err(err) => Err(err),
166        }
167    }
168
169    /// Gets the MitoEngine if it's registered.
170    pub fn mito_engine(&self) -> Option<MitoEngine> {
171        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
172            Some(mito)
173        } else {
174            self.inner
175                .engines
176                .read()
177                .unwrap()
178                .get(MITO_ENGINE_NAME)
179                .cloned()
180                .and_then(|e| {
181                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
182                    if mito.is_none() {
183                        warn!("Mito engine not found in region server engines");
184                    }
185                    mito
186                })
187        }
188    }
189
190    #[tracing::instrument(skip_all)]
191    pub async fn handle_batch_open_requests(
192        &self,
193        parallelism: usize,
194        requests: Vec<(RegionId, RegionOpenRequest)>,
195        ignore_nonexistent_region: bool,
196    ) -> Result<Vec<RegionId>> {
197        self.inner
198            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
199            .await
200    }
201
202    #[tracing::instrument(skip_all)]
203    pub async fn handle_batch_catchup_requests(
204        &self,
205        parallelism: usize,
206        requests: Vec<(RegionId, RegionCatchupRequest)>,
207    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
208        self.inner
209            .handle_batch_catchup_requests(parallelism, requests)
210            .await
211    }
212
213    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
214    pub async fn handle_request(
215        &self,
216        region_id: RegionId,
217        request: RegionRequest,
218    ) -> Result<RegionResponse> {
219        self.inner.handle_request(region_id, request).await
220    }
221
222    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
223    async fn table_provider(
224        &self,
225        region_id: RegionId,
226        ctx: Option<QueryContextRef>,
227    ) -> Result<Arc<dyn TableProvider>> {
228        let status = self
229            .inner
230            .region_map
231            .get(&region_id)
232            .context(RegionNotFoundSnafu { region_id })?
233            .clone();
234        ensure!(
235            matches!(status, RegionEngineWithStatus::Ready(_)),
236            RegionNotReadySnafu { region_id }
237        );
238
239        self.inner
240            .table_provider_factory
241            .create(region_id, status.into_engine(), ctx)
242            .await
243            .context(ExecuteLogicalPlanSnafu)
244    }
245
246    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
247    pub async fn handle_remote_read(
248        &self,
249        request: api::v1::region::QueryRequest,
250        query_ctx: QueryContextRef,
251    ) -> Result<SendableRecordBatchStream> {
252        let _permit = if let Some(p) = &self.inner.parallelism {
253            Some(p.acquire().await?)
254        } else {
255            None
256        };
257
258        let region_id = RegionId::from_u64(request.region_id);
259        let catalog_list = Arc::new(NameAwareCatalogList::new(
260            self.clone(),
261            region_id,
262            query_ctx.clone(),
263        ));
264
265        if query_ctx.explain_verbose() {
266            common_telemetry::info!("Handle remote read for region: {}", region_id);
267        }
268
269        let decoder = self
270            .inner
271            .query_engine
272            .engine_context(query_ctx.clone())
273            .new_plan_decoder()
274            .context(NewPlanDecoderSnafu)?;
275
276        let plan = decoder
277            .decode(Bytes::from(request.plan), catalog_list, false)
278            .await
279            .context(DecodeLogicalPlanSnafu)?;
280
281        self.inner
282            .handle_read(
283                QueryRequest {
284                    header: request.header,
285                    region_id,
286                    plan,
287                },
288                query_ctx,
289            )
290            .await
291    }
292
293    #[tracing::instrument(skip_all)]
294    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
295        let _permit = if let Some(p) = &self.inner.parallelism {
296            Some(p.acquire().await?)
297        } else {
298            None
299        };
300
301        let ctx = request.header.as_ref().map(|h| h.into());
302        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
303
304        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
305            .context(DataFusionSnafu)?;
306        let mut injector = injector_builder
307            .build(self, request.region_id, query_ctx.clone())
308            .await?;
309
310        let plan = request
311            .plan
312            .rewrite(&mut injector)
313            .context(DataFusionSnafu)?
314            .data;
315
316        self.inner
317            .handle_read(QueryRequest { plan, ..request }, query_ctx)
318            .await
319    }
320
321    /// Returns all opened and reportable regions.
322    ///
323    /// Notes: except all metrics regions.
324    pub fn reportable_regions(&self) -> Vec<RegionStat> {
325        self.inner
326            .region_map
327            .iter()
328            .filter_map(|e| {
329                let region_id = *e.key();
330                // Filters out any regions whose role equals None.
331                e.role(region_id).map(|role| RegionStat {
332                    region_id,
333                    engine: e.value().name().to_string(),
334                    role,
335                })
336            })
337            .collect()
338    }
339
340    /// Returns the reportable topics.
341    pub fn topic_stats(&self) -> Vec<TopicStat> {
342        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
343        let Some(reporter) = reporter.as_mut() else {
344            return vec![];
345        };
346        reporter
347            .reportable_topics()
348            .into_iter()
349            .map(|stat| TopicStat {
350                topic_name: stat.topic,
351                record_size: stat.record_size,
352                record_num: stat.record_num,
353                latest_entry_id: stat.latest_entry_id,
354            })
355            .collect()
356    }
357
358    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
359        self.inner.region_map.get(&region_id).and_then(|engine| {
360            engine.role(region_id).map(|role| match role {
361                RegionRole::Follower => false,
362                RegionRole::Leader => true,
363                RegionRole::StagingLeader => true,
364                RegionRole::DowngradingLeader => true,
365            })
366        })
367    }
368
369    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
370        let engine = self
371            .inner
372            .region_map
373            .get(&region_id)
374            .with_context(|| RegionNotFoundSnafu { region_id })?;
375        engine
376            .set_region_role(region_id, role)
377            .with_context(|_| HandleRegionRequestSnafu { region_id })
378    }
379
380    /// Set region role state gracefully.
381    ///
382    /// For [SettableRegionRoleState::Follower]:
383    /// After the call returns, the engine ensures that
384    /// no **further** write or flush operations will succeed in this region.
385    ///
386    /// For [SettableRegionRoleState::DowngradingLeader]:
387    /// After the call returns, the engine ensures that
388    /// no **further** write operations will succeed in this region.
389    pub async fn set_region_role_state_gracefully(
390        &self,
391        region_id: RegionId,
392        state: SettableRegionRoleState,
393    ) -> Result<SetRegionRoleStateResponse> {
394        match self.inner.region_map.get(&region_id) {
395            Some(engine) => Ok(engine
396                .set_region_role_state_gracefully(region_id, state)
397                .await
398                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
399            None => Ok(SetRegionRoleStateResponse::NotFound),
400        }
401    }
402
403    pub fn runtime(&self) -> Runtime {
404        self.inner.runtime.clone()
405    }
406
407    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
408        match self.inner.region_map.get(&region_id) {
409            Some(e) => e.region_statistic(region_id),
410            None => None,
411        }
412    }
413
414    /// Stop the region server.
415    pub async fn stop(&self) -> Result<()> {
416        self.inner.stop().await
417    }
418
419    #[cfg(test)]
420    /// Registers a region for test purpose.
421    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
422        {
423            let mut engines = self.inner.engines.write().unwrap();
424            if !engines.contains_key(engine.name()) {
425                debug!("Registering test engine: {}", engine.name());
426                engines.insert(engine.name().to_string(), engine.clone());
427            }
428        }
429
430        self.inner
431            .region_map
432            .insert(region_id, RegionEngineWithStatus::Ready(engine));
433    }
434
435    async fn handle_batch_ddl_requests(
436        &self,
437        request: region_request::Body,
438    ) -> Result<RegionResponse> {
439        // Safety: we have already checked the request type in `RegionServer::handle()`.
440        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
441            .context(BuildRegionRequestsSnafu)?
442            .unwrap();
443        let tracing_context = TracingContext::from_current_span();
444
445        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
446        self.inner
447            .handle_batch_request(batch_request)
448            .trace(span)
449            .await
450    }
451
452    async fn handle_requests_in_parallel(
453        &self,
454        request: region_request::Body,
455    ) -> Result<RegionResponse> {
456        let requests =
457            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
458
459        // Try to optimize batch Put requests for metric engine
460        // Returns either Some(response) or None(requests_back)
461        match self.try_handle_metric_batch_puts(requests).await? {
462            Either::Left(response) => Ok(response),
463            Either::Right(requests) => {
464                // Fallback: original parallel processing
465                let tracing_context = TracingContext::from_current_span();
466                let join_tasks =
467                    requests
468                        .into_iter()
469                        .map(|(region_id, req): (RegionId, RegionRequest)| {
470                            let self_to_move = self;
471                            let span = tracing_context.attach(info_span!(
472                                "RegionServer::handle_region_request",
473                                region_id = region_id.to_string()
474                            ));
475                            async move {
476                                self_to_move
477                                    .handle_request(region_id, req)
478                                    .trace(span)
479                                    .await
480                            }
481                        });
482
483                let results = try_join_all(join_tasks).await?;
484                let mut affected_rows = 0;
485                let mut extensions = HashMap::new();
486                for result in results {
487                    affected_rows += result.affected_rows;
488                    extensions.extend(result.extensions);
489                }
490
491                Ok(RegionResponse {
492                    affected_rows,
493                    extensions,
494                    metadata: Vec::new(),
495                })
496            }
497        }
498    }
499
500    async fn handle_requests_in_serial(
501        &self,
502        request: region_request::Body,
503    ) -> Result<RegionResponse> {
504        let requests =
505            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
506        let tracing_context = TracingContext::from_current_span();
507
508        let mut affected_rows = 0;
509        let mut extensions = HashMap::new();
510        for (region_id, req) in requests {
511            let span = tracing_context.attach(info_span!(
512                "RegionServer::handle_region_request",
513                region_id = region_id.to_string()
514            ));
515            let result = self.handle_request(region_id, req).trace(span).await?;
516
517            affected_rows += result.affected_rows;
518            extensions.extend(result.extensions);
519        }
520
521        Ok(RegionResponse {
522            affected_rows,
523            extensions,
524            metadata: Vec::new(),
525        })
526    }
527
528    /// Attempts to optimize batch Put requests for metric engine.
529    ///
530    /// Returns Either::Left(response) if optimization succeeded,
531    /// or Either::Right(original_requests) to fall back to parallel processing.
532    ///
533    /// This avoids cloning requests when optimization cannot be applied.
534    async fn try_handle_metric_batch_puts(
535        &self,
536        requests: Vec<(RegionId, RegionRequest)>,
537    ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
538        if requests.is_empty() {
539            return Ok(Either::Right(requests));
540        }
541
542        // Quick check: verify first request is Put and is metric engine
543        if !matches!(requests[0].1, RegionRequest::Put(_)) {
544            return Ok(Either::Right(requests));
545        }
546        let first_region_id = requests[0].0;
547        let request_type = requests[0].1.request_type();
548
549        // SAFETY: If the first request belongs to metric engine, then ALL requests
550        // in this batch are guaranteed to belong to metric engine. This invariant
551        // is maintained by the request batching logic upstream.
552        let engine = match self
553            .inner
554            .get_engine(first_region_id, &RegionChange::None)?
555        {
556            CurrentEngine::Engine(e) => e,
557            _ => return Ok(Either::Right(requests)),
558        };
559
560        if engine.name() != METRIC_ENGINE_NAME {
561            return Ok(Either::Right(requests));
562        }
563
564        // Check if ALL requests are Put (now we know it's worth checking)
565        let mut all_puts = true;
566        for (_, req) in &requests {
567            if !matches!(req, RegionRequest::Put(_)) {
568                all_puts = false;
569                break;
570            }
571        }
572
573        if !all_puts {
574            return Ok(Either::Right(requests));
575        }
576
577        // Now extract Put requests by consuming ownership (zero clone!)
578        let put_requests = requests.into_iter().map(|(region_id, req)| {
579            if let RegionRequest::Put(put) = req {
580                (region_id, put)
581            } else {
582                unreachable!("Already checked all are Put")
583            }
584        });
585
586        // Downcast to MetricEngine and call batch API
587        let metric_engine =
588            engine
589                .as_any()
590                .downcast_ref::<MetricEngine>()
591                .context(UnexpectedSnafu {
592                    violated: "Failed to downcast to MetricEngine",
593                })?;
594
595        let tracing_context = TracingContext::from_current_span();
596        let batch_size = put_requests.len();
597        let span = tracing_context.attach(info_span!(
598            "RegionServer::handle_metric_batch_puts",
599            batch_size = batch_size,
600        ));
601        let result = metric_engine
602            .put_regions_batch(put_requests)
603            .trace(span)
604            .await
605            .map_err(BoxedError::new)
606            .context(HandleRegionRequestSnafu {
607                region_id: first_region_id,
608            });
609
610        match result {
611            Ok(total_affected) => {
612                crate::metrics::REGION_CHANGED_ROW_COUNT
613                    .with_label_values(&[request_type])
614                    .inc_by(total_affected as u64);
615                Ok(Either::Left(RegionResponse::new(total_affected)))
616            }
617            Err(err) => {
618                crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
619                    .with_label_values(&[request_type])
620                    .inc_by(batch_size as u64);
621                Err(err)
622            }
623        }
624    }
625
626    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
627        let region_id = RegionId::from_u64(request.region_id);
628        let manifest_info = request
629            .manifest_info
630            .context(error::MissingRequiredFieldSnafu {
631                name: "manifest_info",
632            })?;
633
634        let manifest_info = match manifest_info {
635            ManifestInfo::MitoManifestInfo(info) => {
636                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
637            }
638            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
639                info.data_manifest_version,
640                0,
641                info.metadata_manifest_version,
642                0,
643            ),
644        };
645
646        let tracing_context = TracingContext::from_current_span();
647        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
648
649        self.sync_region(
650            region_id,
651            SyncRegionFromRequest::from_manifest(manifest_info),
652        )
653        .trace(span)
654        .await
655        .map(|_| RegionResponse::new(AffectedRows::default()))
656    }
657
658    /// Handles the ListMetadata request and retrieves metadata for specified regions.
659    ///
660    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
661    /// non-existing regions as `null`.
662    #[tracing::instrument(skip_all)]
663    async fn handle_list_metadata_request(
664        &self,
665        request: &ListMetadataRequest,
666    ) -> Result<RegionResponse> {
667        let mut region_metadatas = Vec::new();
668        // Collect metadata for each region
669        for region_id in &request.region_ids {
670            let region_id = RegionId::from_u64(*region_id);
671            // Get the engine.
672            let Some(engine) = self.find_engine(region_id)? else {
673                region_metadatas.push(None);
674                continue;
675            };
676
677            match engine.get_metadata(region_id).await {
678                Ok(metadata) => region_metadatas.push(Some(metadata)),
679                Err(err) => {
680                    if err.status_code() == StatusCode::RegionNotFound {
681                        region_metadatas.push(None);
682                    } else {
683                        Err(err).with_context(|_| GetRegionMetadataSnafu {
684                            engine: engine.name(),
685                            region_id,
686                        })?;
687                    }
688                }
689            }
690        }
691
692        // Serialize metadata to JSON
693        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
694
695        let response = RegionResponse::from_metadata(json_result);
696
697        Ok(response)
698    }
699
700    /// Sync region manifest and registers new opened logical regions.
701    pub async fn sync_region(
702        &self,
703        region_id: RegionId,
704        request: SyncRegionFromRequest,
705    ) -> Result<()> {
706        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
707            CurrentEngine::Engine(engine) => engine,
708            _ => {
709                return UnexpectedSnafu {
710                    violated: "unexpected EarlyReturn engine status for a ready region",
711                }
712                .fail();
713            }
714        };
715
716        self.inner
717            .handle_sync_region(&engine, region_id, request)
718            .await
719    }
720
721    /// Remaps manifests from old regions to new regions.
722    pub async fn remap_manifests(
723        &self,
724        request: RemapManifestsRequest,
725    ) -> Result<RemapManifestsResponse> {
726        let region_id = request.region_id;
727        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
728            CurrentEngine::Engine(engine) => engine,
729            _ => {
730                return UnexpectedSnafu {
731                    violated: "unexpected EarlyReturn engine status for a ready region",
732                }
733                .fail();
734            }
735        };
736
737        engine
738            .remap_manifests(request)
739            .await
740            .with_context(|_| HandleRegionRequestSnafu { region_id })
741    }
742
743    fn is_suspended(&self) -> bool {
744        self.suspend.load(Ordering::Relaxed)
745    }
746
747    pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
748        self.suspend.clone()
749    }
750}
751
752#[async_trait]
753impl RegionServerHandler for RegionServer {
754    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
755        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
756            .with_label_values(&[request.as_ref()]);
757        let response = match &request {
758            region_request::Body::Creates(_)
759            | region_request::Body::Drops(_)
760            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
761            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
762                self.handle_requests_in_parallel(request).await
763            }
764            region_request::Body::Sync(sync_request) => {
765                self.handle_sync_region_request(sync_request).await
766            }
767            region_request::Body::ListMetadata(list_metadata_request) => {
768                self.handle_list_metadata_request(list_metadata_request)
769                    .await
770            }
771            _ => self.handle_requests_in_serial(request).await,
772        }
773        .map_err(BoxedError::new)
774        .inspect_err(|_| {
775            failed_requests_cnt.inc();
776        })
777        .context(ExecuteGrpcRequestSnafu)?;
778
779        Ok(RegionResponseV1 {
780            header: Some(ResponseHeader {
781                status: Some(Status {
782                    status_code: StatusCode::Success as _,
783                    ..Default::default()
784                }),
785            }),
786            affected_rows: response.affected_rows as _,
787            extensions: response.extensions,
788            metadata: response.metadata,
789        })
790    }
791}
792
793#[async_trait]
794impl FlightCraft for RegionServer {
795    async fn do_get(
796        &self,
797        request: Request<Ticket>,
798    ) -> TonicResult<Response<TonicStream<FlightData>>> {
799        ensure!(!self.is_suspended(), SuspendedSnafu);
800
801        let ticket = request.into_inner().ticket;
802        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
803            .context(servers_error::InvalidFlightTicketSnafu)?;
804        let tracing_context = request
805            .header
806            .as_ref()
807            .map(|h| TracingContext::from_w3c(&h.tracing_context))
808            .unwrap_or_default();
809        let query_ctx = request
810            .header
811            .as_ref()
812            .map(|h| Arc::new(QueryContext::from(h)))
813            .unwrap_or(QueryContext::arc());
814
815        let result = self
816            .handle_remote_read(request, query_ctx.clone())
817            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
818            .await?;
819
820        let stream = Box::pin(FlightRecordBatchStream::new(
821            result,
822            tracing_context,
823            self.flight_compression,
824            query_ctx,
825        ));
826        Ok(Response::new(stream))
827    }
828}
829
830#[derive(Clone)]
831enum RegionEngineWithStatus {
832    // An opening, or creating region.
833    Registering(RegionEngineRef),
834    // A closing, or dropping region.
835    Deregistering(RegionEngineRef),
836    // A ready region.
837    Ready(RegionEngineRef),
838}
839
840impl RegionEngineWithStatus {
841    /// Returns [RegionEngineRef].
842    pub fn into_engine(self) -> RegionEngineRef {
843        match self {
844            RegionEngineWithStatus::Registering(engine) => engine,
845            RegionEngineWithStatus::Deregistering(engine) => engine,
846            RegionEngineWithStatus::Ready(engine) => engine,
847        }
848    }
849}
850
851impl Deref for RegionEngineWithStatus {
852    type Target = RegionEngineRef;
853
854    fn deref(&self) -> &Self::Target {
855        match self {
856            RegionEngineWithStatus::Registering(engine) => engine,
857            RegionEngineWithStatus::Deregistering(engine) => engine,
858            RegionEngineWithStatus::Ready(engine) => engine,
859        }
860    }
861}
862
863struct RegionServerInner {
864    engines: RwLock<HashMap<String, RegionEngineRef>>,
865    region_map: DashMap<RegionId, RegionEngineWithStatus>,
866    query_engine: QueryEngineRef,
867    runtime: Runtime,
868    event_listener: RegionServerEventListenerRef,
869    table_provider_factory: TableProviderFactoryRef,
870    /// The number of queries allowed to be executed at the same time.
871    /// Act as last line of defense on datanode to prevent query overloading.
872    parallelism: Option<RegionServerParallelism>,
873    /// The topic stats reporter.
874    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
875    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
876    /// server with a concrete engine; acceptable for now to fetch Mito-specific
877    /// info (e.g., list SSTs). Consider a diagnostics trait later.
878    mito_engine: RwLock<Option<MitoEngine>>,
879}
880
881struct RegionServerParallelism {
882    semaphore: Semaphore,
883    timeout: Duration,
884}
885
886impl RegionServerParallelism {
887    pub fn from_opts(
888        max_concurrent_queries: usize,
889        concurrent_query_limiter_timeout: Duration,
890    ) -> Option<Self> {
891        if max_concurrent_queries == 0 {
892            return None;
893        }
894        Some(RegionServerParallelism {
895            semaphore: Semaphore::new(max_concurrent_queries),
896            timeout: concurrent_query_limiter_timeout,
897        })
898    }
899
900    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
901        timeout(self.timeout, self.semaphore.acquire())
902            .await
903            .context(ConcurrentQueryLimiterTimeoutSnafu)?
904            .context(ConcurrentQueryLimiterClosedSnafu)
905    }
906}
907
908enum CurrentEngine {
909    Engine(RegionEngineRef),
910    EarlyReturn(AffectedRows),
911}
912
913impl Debug for CurrentEngine {
914    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
915        match self {
916            CurrentEngine::Engine(engine) => f
917                .debug_struct("CurrentEngine")
918                .field("engine", &engine.name())
919                .finish(),
920            CurrentEngine::EarlyReturn(rows) => f
921                .debug_struct("CurrentEngine")
922                .field("return", rows)
923                .finish(),
924        }
925    }
926}
927
928impl RegionServerInner {
929    pub fn new(
930        query_engine: QueryEngineRef,
931        runtime: Runtime,
932        event_listener: RegionServerEventListenerRef,
933        table_provider_factory: TableProviderFactoryRef,
934        parallelism: Option<RegionServerParallelism>,
935    ) -> Self {
936        Self {
937            engines: RwLock::new(HashMap::new()),
938            region_map: DashMap::new(),
939            query_engine,
940            runtime,
941            event_listener,
942            table_provider_factory,
943            parallelism,
944            topic_stats_reporter: RwLock::new(None),
945            mito_engine: RwLock::new(None),
946        }
947    }
948
949    pub fn register_engine(&self, engine: RegionEngineRef) {
950        let engine_name = engine.name();
951        if engine_name == MITO_ENGINE_NAME
952            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
953        {
954            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
955        }
956
957        info!("Region Engine {engine_name} is registered");
958        self.engines
959            .write()
960            .unwrap()
961            .insert(engine_name.to_string(), engine);
962    }
963
964    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
965        info!("Set topic stats reporter");
966        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
967    }
968
969    fn get_engine(
970        &self,
971        region_id: RegionId,
972        region_change: &RegionChange,
973    ) -> Result<CurrentEngine> {
974        let current_region_status = self.region_map.get(&region_id);
975
976        let engine = match region_change {
977            RegionChange::Register(attribute) => match current_region_status {
978                Some(status) => match status.clone() {
979                    RegionEngineWithStatus::Registering(engine) => engine,
980                    RegionEngineWithStatus::Deregistering(_) => {
981                        return error::RegionBusySnafu { region_id }.fail();
982                    }
983                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
984                },
985                _ => self
986                    .engines
987                    .read()
988                    .unwrap()
989                    .get(attribute.engine())
990                    .with_context(|| RegionEngineNotFoundSnafu {
991                        name: attribute.engine(),
992                    })?
993                    .clone(),
994            },
995            RegionChange::Deregisters => match current_region_status {
996                Some(status) => match status.clone() {
997                    RegionEngineWithStatus::Registering(_) => {
998                        return error::RegionBusySnafu { region_id }.fail();
999                    }
1000                    RegionEngineWithStatus::Deregistering(_) => {
1001                        return Ok(CurrentEngine::EarlyReturn(0));
1002                    }
1003                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1004                },
1005                None => return Ok(CurrentEngine::EarlyReturn(0)),
1006            },
1007            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1008                match current_region_status {
1009                    Some(status) => match status.clone() {
1010                        RegionEngineWithStatus::Registering(_) => {
1011                            return error::RegionNotReadySnafu { region_id }.fail();
1012                        }
1013                        RegionEngineWithStatus::Deregistering(_) => {
1014                            return error::RegionNotFoundSnafu { region_id }.fail();
1015                        }
1016                        RegionEngineWithStatus::Ready(engine) => engine,
1017                    },
1018                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
1019                }
1020            }
1021        };
1022
1023        Ok(CurrentEngine::Engine(engine))
1024    }
1025
1026    async fn handle_batch_open_requests_inner(
1027        &self,
1028        engine: RegionEngineRef,
1029        parallelism: usize,
1030        requests: Vec<(RegionId, RegionOpenRequest)>,
1031        ignore_nonexistent_region: bool,
1032    ) -> Result<Vec<RegionId>> {
1033        let region_changes = requests
1034            .iter()
1035            .map(|(region_id, open)| {
1036                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1037                Ok((*region_id, RegionChange::Register(attribute)))
1038            })
1039            .collect::<Result<HashMap<_, _>>>()?;
1040
1041        for (&region_id, region_change) in &region_changes {
1042            self.set_region_status_not_ready(region_id, &engine, region_change)
1043        }
1044
1045        let mut open_regions = Vec::with_capacity(requests.len());
1046        let mut errors = vec![];
1047        match engine
1048            .handle_batch_open_requests(parallelism, requests)
1049            .await
1050            .with_context(|_| HandleBatchOpenRequestSnafu)
1051        {
1052            Ok(results) => {
1053                for (region_id, result) in results {
1054                    let region_change = &region_changes[&region_id];
1055                    match result {
1056                        Ok(_) => {
1057                            if let Err(e) = self
1058                                .set_region_status_ready(region_id, engine.clone(), *region_change)
1059                                .await
1060                            {
1061                                error!(e; "Failed to set region to ready: {}", region_id);
1062                                errors.push(BoxedError::new(e));
1063                            } else {
1064                                open_regions.push(region_id)
1065                            }
1066                        }
1067                        Err(e) => {
1068                            self.unset_region_status(region_id, &engine, *region_change);
1069                            if e.status_code() == StatusCode::RegionNotFound
1070                                && ignore_nonexistent_region
1071                            {
1072                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1073                            } else {
1074                                error!(e; "Failed to open region: {}", region_id);
1075                                errors.push(e);
1076                            }
1077                        }
1078                    }
1079                }
1080            }
1081            Err(e) => {
1082                for (&region_id, region_change) in &region_changes {
1083                    self.unset_region_status(region_id, &engine, *region_change);
1084                }
1085                error!(e; "Failed to open batch regions");
1086                errors.push(BoxedError::new(e));
1087            }
1088        }
1089
1090        if !errors.is_empty() {
1091            return error::UnexpectedSnafu {
1092                // Returns the first error.
1093                violated: format!("Failed to open batch regions: {:?}", errors[0]),
1094            }
1095            .fail();
1096        }
1097
1098        Ok(open_regions)
1099    }
1100
1101    pub async fn handle_batch_open_requests(
1102        &self,
1103        parallelism: usize,
1104        requests: Vec<(RegionId, RegionOpenRequest)>,
1105        ignore_nonexistent_region: bool,
1106    ) -> Result<Vec<RegionId>> {
1107        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1108            HashMap::with_capacity(requests.len());
1109        for (region_id, request) in requests {
1110            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1111                requests.push((region_id, request));
1112            } else {
1113                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1114            }
1115        }
1116
1117        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1118        for (engine, requests) in engine_grouped_requests {
1119            let engine = self
1120                .engines
1121                .read()
1122                .unwrap()
1123                .get(&engine)
1124                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1125                .clone();
1126            results.push(
1127                self.handle_batch_open_requests_inner(
1128                    engine,
1129                    parallelism,
1130                    requests,
1131                    ignore_nonexistent_region,
1132                )
1133                .await,
1134            )
1135        }
1136
1137        Ok(results
1138            .into_iter()
1139            .collect::<Result<Vec<_>>>()?
1140            .into_iter()
1141            .flatten()
1142            .collect::<Vec<_>>())
1143    }
1144
1145    pub async fn handle_batch_catchup_requests_inner(
1146        &self,
1147        engine: RegionEngineRef,
1148        parallelism: usize,
1149        requests: Vec<(RegionId, RegionCatchupRequest)>,
1150    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1151        for (region_id, _) in &requests {
1152            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1153        }
1154        let region_ids = requests
1155            .iter()
1156            .map(|(region_id, _)| *region_id)
1157            .collect::<Vec<_>>();
1158        let mut responses = Vec::with_capacity(requests.len());
1159        match engine
1160            .handle_batch_catchup_requests(parallelism, requests)
1161            .await
1162        {
1163            Ok(results) => {
1164                for (region_id, result) in results {
1165                    match result {
1166                        Ok(_) => {
1167                            if let Err(e) = self
1168                                .set_region_status_ready(
1169                                    region_id,
1170                                    engine.clone(),
1171                                    RegionChange::Catchup,
1172                                )
1173                                .await
1174                            {
1175                                error!(e; "Failed to set region to ready: {}", region_id);
1176                                responses.push((region_id, Err(BoxedError::new(e))));
1177                            } else {
1178                                responses.push((region_id, Ok(())));
1179                            }
1180                        }
1181                        Err(e) => {
1182                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1183                            error!(e; "Failed to catchup region: {}", region_id);
1184                            responses.push((region_id, Err(e)));
1185                        }
1186                    }
1187                }
1188            }
1189            Err(e) => {
1190                for region_id in region_ids {
1191                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1192                }
1193                error!(e; "Failed to catchup batch regions");
1194                return error::UnexpectedSnafu {
1195                    violated: format!("Failed to catchup batch regions: {:?}", e),
1196                }
1197                .fail();
1198            }
1199        }
1200
1201        Ok(responses)
1202    }
1203
1204    pub async fn handle_batch_catchup_requests(
1205        &self,
1206        parallelism: usize,
1207        requests: Vec<(RegionId, RegionCatchupRequest)>,
1208    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1209        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1210
1211        let mut responses = Vec::with_capacity(requests.len());
1212        for (region_id, request) in requests {
1213            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1214                match engine {
1215                    CurrentEngine::Engine(engine) => {
1216                        engine_grouped_requests
1217                            .entry(engine.name().to_string())
1218                            .or_default()
1219                            .push((region_id, request));
1220                    }
1221                    CurrentEngine::EarlyReturn(_) => {
1222                        return error::UnexpectedSnafu {
1223                            violated: format!("Unexpected engine type for region {}", region_id),
1224                        }
1225                        .fail();
1226                    }
1227                }
1228            } else {
1229                responses.push((
1230                    region_id,
1231                    Err(BoxedError::new(
1232                        error::RegionNotFoundSnafu { region_id }.build(),
1233                    )),
1234                ));
1235            }
1236        }
1237
1238        for (engine, requests) in engine_grouped_requests {
1239            let engine = self
1240                .engines
1241                .read()
1242                .unwrap()
1243                .get(&engine)
1244                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1245                .clone();
1246            responses.extend(
1247                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1248                    .await?,
1249            );
1250        }
1251
1252        Ok(responses)
1253    }
1254
1255    // Handle requests in batch.
1256    //
1257    // limitation: all create requests must be in the same engine.
1258    pub async fn handle_batch_request(
1259        &self,
1260        batch_request: BatchRegionDdlRequest,
1261    ) -> Result<RegionResponse> {
1262        let region_changes = match &batch_request {
1263            BatchRegionDdlRequest::Create(requests) => requests
1264                .iter()
1265                .map(|(region_id, create)| {
1266                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1267                    Ok((*region_id, RegionChange::Register(attribute)))
1268                })
1269                .collect::<Result<Vec<_>>>()?,
1270            BatchRegionDdlRequest::Drop(requests) => requests
1271                .iter()
1272                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1273                .collect::<Vec<_>>(),
1274            BatchRegionDdlRequest::Alter(requests) => requests
1275                .iter()
1276                .map(|(region_id, _)| (*region_id, RegionChange::None))
1277                .collect::<Vec<_>>(),
1278        };
1279
1280        // The ddl procedure will ensure all requests are in the same engine.
1281        // Therefore, we can get the engine from the first request.
1282        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1283        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1284            CurrentEngine::Engine(engine) => engine,
1285            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1286        };
1287
1288        for (region_id, region_change) in region_changes.iter() {
1289            self.set_region_status_not_ready(*region_id, &engine, region_change);
1290        }
1291
1292        let ddl_type = batch_request.request_type();
1293        let result = engine
1294            .handle_batch_ddl_requests(batch_request)
1295            .await
1296            .context(HandleBatchDdlRequestSnafu { ddl_type });
1297
1298        match result {
1299            Ok(result) => {
1300                for (region_id, region_change) in &region_changes {
1301                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1302                        .await?;
1303                }
1304
1305                Ok(RegionResponse {
1306                    affected_rows: result.affected_rows,
1307                    extensions: result.extensions,
1308                    metadata: Vec::new(),
1309                })
1310            }
1311            Err(err) => {
1312                for (region_id, region_change) in region_changes {
1313                    self.unset_region_status(region_id, &engine, region_change);
1314                }
1315
1316                Err(err)
1317            }
1318        }
1319    }
1320
1321    pub async fn handle_request(
1322        &self,
1323        region_id: RegionId,
1324        request: RegionRequest,
1325    ) -> Result<RegionResponse> {
1326        let request_type = request.request_type();
1327        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1328            .with_label_values(&[request_type])
1329            .start_timer();
1330
1331        let region_change = match &request {
1332            RegionRequest::Create(create) => {
1333                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1334                RegionChange::Register(attribute)
1335            }
1336            RegionRequest::Open(open) => {
1337                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1338                RegionChange::Register(attribute)
1339            }
1340            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1341            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1342                RegionChange::Ingest
1343            }
1344            RegionRequest::Alter(_)
1345            | RegionRequest::Flush(_)
1346            | RegionRequest::Compact(_)
1347            | RegionRequest::Truncate(_)
1348            | RegionRequest::BuildIndex(_)
1349            | RegionRequest::EnterStaging(_)
1350            | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1351            RegionRequest::Catchup(_) => RegionChange::Catchup,
1352        };
1353
1354        let engine = match self.get_engine(region_id, &region_change)? {
1355            CurrentEngine::Engine(engine) => engine,
1356            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1357        };
1358
1359        // Sets corresponding region status to registering/deregistering before the operation.
1360        self.set_region_status_not_ready(region_id, &engine, &region_change);
1361
1362        match engine
1363            .handle_request(region_id, request)
1364            .await
1365            .with_context(|_| HandleRegionRequestSnafu { region_id })
1366        {
1367            Ok(result) => {
1368                // Update metrics
1369                if matches!(region_change, RegionChange::Ingest) {
1370                    crate::metrics::REGION_CHANGED_ROW_COUNT
1371                        .with_label_values(&[request_type])
1372                        .inc_by(result.affected_rows as u64);
1373                }
1374                // Sets corresponding region status to ready.
1375                self.set_region_status_ready(region_id, engine.clone(), region_change)
1376                    .await?;
1377
1378                Ok(RegionResponse {
1379                    affected_rows: result.affected_rows,
1380                    extensions: result.extensions,
1381                    metadata: Vec::new(),
1382                })
1383            }
1384            Err(err) => {
1385                if matches!(region_change, RegionChange::Ingest) {
1386                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1387                        .with_label_values(&[request_type])
1388                        .inc();
1389                }
1390                // Removes the region status if the operation fails.
1391                self.unset_region_status(region_id, &engine, region_change);
1392                Err(err)
1393            }
1394        }
1395    }
1396
1397    /// Handles the sync region request.
1398    pub async fn handle_sync_region(
1399        &self,
1400        engine: &RegionEngineRef,
1401        region_id: RegionId,
1402        request: SyncRegionFromRequest,
1403    ) -> Result<()> {
1404        let Some(new_opened_regions) = engine
1405            .sync_region(region_id, request)
1406            .await
1407            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1408            .new_opened_logical_region_ids()
1409        else {
1410            return Ok(());
1411        };
1412
1413        for region in &new_opened_regions {
1414            self.region_map
1415                .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1416        }
1417        if !new_opened_regions.is_empty() {
1418            info!(
1419                region_id = %region_id,
1420                logical_region_count = new_opened_regions.len(),
1421                logical_regions = ?new_opened_regions,
1422                "Logical regions are registered"
1423            );
1424        }
1425
1426        Ok(())
1427    }
1428
1429    fn set_region_status_not_ready(
1430        &self,
1431        region_id: RegionId,
1432        engine: &RegionEngineRef,
1433        region_change: &RegionChange,
1434    ) {
1435        match region_change {
1436            RegionChange::Register(_) => {
1437                self.region_map.insert(
1438                    region_id,
1439                    RegionEngineWithStatus::Registering(engine.clone()),
1440                );
1441            }
1442            RegionChange::Deregisters => {
1443                self.region_map.insert(
1444                    region_id,
1445                    RegionEngineWithStatus::Deregistering(engine.clone()),
1446                );
1447            }
1448            _ => {}
1449        }
1450    }
1451
1452    fn unset_region_status(
1453        &self,
1454        region_id: RegionId,
1455        engine: &RegionEngineRef,
1456        region_change: RegionChange,
1457    ) {
1458        match region_change {
1459            RegionChange::None | RegionChange::Ingest => {}
1460            RegionChange::Register(_) => {
1461                self.region_map.remove(&region_id);
1462            }
1463            RegionChange::Deregisters => {
1464                self.region_map
1465                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1466            }
1467            RegionChange::Catchup => {}
1468        }
1469    }
1470
1471    async fn set_region_status_ready(
1472        &self,
1473        region_id: RegionId,
1474        engine: RegionEngineRef,
1475        region_change: RegionChange,
1476    ) -> Result<()> {
1477        let engine_type = engine.name();
1478        match region_change {
1479            RegionChange::None | RegionChange::Ingest => {}
1480            RegionChange::Register(attribute) => {
1481                info!(
1482                    "Region {region_id} is registered to engine {}",
1483                    attribute.engine()
1484                );
1485                self.region_map
1486                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1487
1488                match attribute {
1489                    RegionAttribute::Metric { physical } => {
1490                        if physical {
1491                            // Registers the logical regions belong to the physical region (`region_id`).
1492                            self.register_logical_regions(&engine, region_id).await?;
1493                            // We only send the `on_region_registered` event of the physical region.
1494                            self.event_listener.on_region_registered(region_id);
1495                        }
1496                    }
1497                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1498                    RegionAttribute::File => {
1499                        // do nothing
1500                    }
1501                }
1502            }
1503            RegionChange::Deregisters => {
1504                info!("Region {region_id} is deregistered from engine {engine_type}");
1505                self.region_map
1506                    .remove(&region_id)
1507                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1508                self.event_listener.on_region_deregistered(region_id);
1509            }
1510            RegionChange::Catchup => {
1511                if is_metric_engine(engine.name()) {
1512                    // Registers the logical regions belong to the physical region (`region_id`).
1513                    self.register_logical_regions(&engine, region_id).await?;
1514                }
1515            }
1516        }
1517        Ok(())
1518    }
1519
1520    async fn register_logical_regions(
1521        &self,
1522        engine: &RegionEngineRef,
1523        physical_region_id: RegionId,
1524    ) -> Result<()> {
1525        let metric_engine =
1526            engine
1527                .as_any()
1528                .downcast_ref::<MetricEngine>()
1529                .context(UnexpectedSnafu {
1530                    violated: format!(
1531                        "expecting engine type '{}', actual '{}'",
1532                        METRIC_ENGINE_NAME,
1533                        engine.name(),
1534                    ),
1535                })?;
1536
1537        let logical_regions = metric_engine
1538            .logical_regions(physical_region_id)
1539            .await
1540            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1541
1542        for region in &logical_regions {
1543            self.region_map
1544                .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1545        }
1546        if !logical_regions.is_empty() {
1547            info!(
1548                physical_region_id = %physical_region_id,
1549                logical_region_count = logical_regions.len(),
1550                logical_regions = ?logical_regions,
1551                "Logical regions are registered"
1552            );
1553        }
1554        Ok(())
1555    }
1556
1557    pub async fn handle_read(
1558        &self,
1559        request: QueryRequest,
1560        query_ctx: QueryContextRef,
1561    ) -> Result<SendableRecordBatchStream> {
1562        // TODO(ruihang): add metrics and set trace id
1563
1564        let result = self
1565            .query_engine
1566            .execute(request.plan, query_ctx)
1567            .await
1568            .context(ExecuteLogicalPlanSnafu)?;
1569
1570        match result.data {
1571            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1572                UnsupportedOutputSnafu { expected: "stream" }.fail()
1573            }
1574            OutputData::Stream(stream) => Ok(stream),
1575        }
1576    }
1577
1578    async fn stop(&self) -> Result<()> {
1579        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1580        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1581        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1582        //
1583        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1584        // it here, collect the values first then use later separately.
1585
1586        let regions = self
1587            .region_map
1588            .iter()
1589            .map(|x| (*x.key(), x.value().clone()))
1590            .collect::<Vec<_>>();
1591        let num_regions = regions.len();
1592
1593        for (region_id, engine) in regions {
1594            let closed = engine
1595                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1596                .await;
1597            match closed {
1598                Ok(_) => debug!("Region {region_id} is closed"),
1599                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1600            }
1601        }
1602        self.region_map.clear();
1603        info!("closed {num_regions} regions");
1604
1605        drop(self.mito_engine.write().unwrap().take());
1606        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1607        for (engine_name, engine) in engines {
1608            engine
1609                .stop()
1610                .await
1611                .context(StopRegionEngineSnafu { name: &engine_name })?;
1612            info!("Region engine {engine_name} is stopped");
1613        }
1614
1615        Ok(())
1616    }
1617}
1618
1619#[derive(Debug, Clone, Copy)]
1620enum RegionChange {
1621    None,
1622    Register(RegionAttribute),
1623    Deregisters,
1624    Catchup,
1625    Ingest,
1626}
1627
1628fn is_metric_engine(engine: &str) -> bool {
1629    engine == METRIC_ENGINE_NAME
1630}
1631
1632fn parse_region_attribute(
1633    engine: &str,
1634    options: &HashMap<String, String>,
1635) -> Result<RegionAttribute> {
1636    match engine {
1637        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1638        METRIC_ENGINE_NAME => {
1639            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1640
1641            Ok(RegionAttribute::Metric { physical })
1642        }
1643        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1644        _ => error::UnexpectedSnafu {
1645            violated: format!("Unknown engine: {}", engine),
1646        }
1647        .fail(),
1648    }
1649}
1650
1651#[derive(Debug, Clone, Copy)]
1652enum RegionAttribute {
1653    Mito,
1654    Metric { physical: bool },
1655    File,
1656}
1657
1658impl RegionAttribute {
1659    fn engine(&self) -> &'static str {
1660        match self {
1661            RegionAttribute::Mito => MITO_ENGINE_NAME,
1662            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1663            RegionAttribute::File => FILE_ENGINE_NAME,
1664        }
1665    }
1666}
1667
1668#[cfg(test)]
1669mod tests {
1670
1671    use std::assert_matches;
1672
1673    use api::v1::SemanticType;
1674    use common_error::ext::ErrorExt;
1675    use datatypes::prelude::ConcreteDataType;
1676    use mito2::test_util::CreateRequestBuilder;
1677    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1678    use store_api::region_engine::RegionEngine;
1679    use store_api::region_request::{
1680        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1681    };
1682    use store_api::storage::RegionId;
1683
1684    use super::*;
1685    use crate::error::Result;
1686    use crate::tests::{MockRegionEngine, mock_region_server};
1687
1688    #[tokio::test]
1689    async fn test_region_registering() {
1690        common_telemetry::init_default_ut_logging();
1691
1692        let mut mock_region_server = mock_region_server();
1693        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1694        let engine_name = engine.name();
1695        mock_region_server.register_engine(engine.clone());
1696        let region_id = RegionId::new(1, 1);
1697        let builder = CreateRequestBuilder::new();
1698        let create_req = builder.build();
1699        // Tries to create/open a registering region.
1700        mock_region_server.inner.region_map.insert(
1701            region_id,
1702            RegionEngineWithStatus::Registering(engine.clone()),
1703        );
1704        let response = mock_region_server
1705            .handle_request(region_id, RegionRequest::Create(create_req))
1706            .await
1707            .unwrap();
1708        assert_eq!(response.affected_rows, 0);
1709        let status = mock_region_server
1710            .inner
1711            .region_map
1712            .get(&region_id)
1713            .unwrap()
1714            .clone();
1715        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1716
1717        mock_region_server.inner.region_map.insert(
1718            region_id,
1719            RegionEngineWithStatus::Registering(engine.clone()),
1720        );
1721        let response = mock_region_server
1722            .handle_request(
1723                region_id,
1724                RegionRequest::Open(RegionOpenRequest {
1725                    engine: engine_name.to_string(),
1726                    table_dir: String::new(),
1727                    path_type: PathType::Bare,
1728                    options: Default::default(),
1729                    skip_wal_replay: false,
1730                    checkpoint: None,
1731                }),
1732            )
1733            .await
1734            .unwrap();
1735        assert_eq!(response.affected_rows, 0);
1736        let status = mock_region_server
1737            .inner
1738            .region_map
1739            .get(&region_id)
1740            .unwrap()
1741            .clone();
1742        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1743    }
1744
1745    #[tokio::test]
1746    async fn test_region_deregistering() {
1747        common_telemetry::init_default_ut_logging();
1748
1749        let mut mock_region_server = mock_region_server();
1750        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1751
1752        mock_region_server.register_engine(engine.clone());
1753
1754        let region_id = RegionId::new(1, 1);
1755
1756        // Tries to drop/close a registering region.
1757        mock_region_server.inner.region_map.insert(
1758            region_id,
1759            RegionEngineWithStatus::Deregistering(engine.clone()),
1760        );
1761
1762        let response = mock_region_server
1763            .handle_request(
1764                region_id,
1765                RegionRequest::Drop(RegionDropRequest {
1766                    fast_path: false,
1767                    force: false,
1768                    partial_drop: false,
1769                }),
1770            )
1771            .await
1772            .unwrap();
1773        assert_eq!(response.affected_rows, 0);
1774
1775        let status = mock_region_server
1776            .inner
1777            .region_map
1778            .get(&region_id)
1779            .unwrap()
1780            .clone();
1781        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1782
1783        mock_region_server.inner.region_map.insert(
1784            region_id,
1785            RegionEngineWithStatus::Deregistering(engine.clone()),
1786        );
1787
1788        let response = mock_region_server
1789            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1790            .await
1791            .unwrap();
1792        assert_eq!(response.affected_rows, 0);
1793
1794        let status = mock_region_server
1795            .inner
1796            .region_map
1797            .get(&region_id)
1798            .unwrap()
1799            .clone();
1800        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1801    }
1802
1803    #[tokio::test]
1804    async fn test_region_not_ready() {
1805        common_telemetry::init_default_ut_logging();
1806
1807        let mut mock_region_server = mock_region_server();
1808        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1809
1810        mock_region_server.register_engine(engine.clone());
1811
1812        let region_id = RegionId::new(1, 1);
1813
1814        // Tries to drop/close a registering region.
1815        mock_region_server.inner.region_map.insert(
1816            region_id,
1817            RegionEngineWithStatus::Registering(engine.clone()),
1818        );
1819
1820        let err = mock_region_server
1821            .handle_request(
1822                region_id,
1823                RegionRequest::Truncate(RegionTruncateRequest::All),
1824            )
1825            .await
1826            .unwrap_err();
1827
1828        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1829    }
1830
1831    #[tokio::test]
1832    async fn test_region_request_failed() {
1833        common_telemetry::init_default_ut_logging();
1834
1835        let mut mock_region_server = mock_region_server();
1836        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1837            MITO_ENGINE_NAME,
1838            Box::new(|_region_id, _request| {
1839                error::UnexpectedSnafu {
1840                    violated: "test".to_string(),
1841                }
1842                .fail()
1843            }),
1844        );
1845
1846        mock_region_server.register_engine(engine.clone());
1847
1848        let region_id = RegionId::new(1, 1);
1849        let builder = CreateRequestBuilder::new();
1850        let create_req = builder.build();
1851        mock_region_server
1852            .handle_request(region_id, RegionRequest::Create(create_req))
1853            .await
1854            .unwrap_err();
1855
1856        let status = mock_region_server.inner.region_map.get(&region_id);
1857        assert!(status.is_none());
1858
1859        mock_region_server
1860            .inner
1861            .region_map
1862            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1863
1864        mock_region_server
1865            .handle_request(
1866                region_id,
1867                RegionRequest::Drop(RegionDropRequest {
1868                    fast_path: false,
1869                    force: false,
1870                    partial_drop: false,
1871                }),
1872            )
1873            .await
1874            .unwrap_err();
1875
1876        let status = mock_region_server.inner.region_map.get(&region_id);
1877        assert!(status.is_some());
1878    }
1879
1880    #[tokio::test]
1881    async fn test_batch_open_region_ignore_nonexistent_regions() {
1882        common_telemetry::init_default_ut_logging();
1883        let mut mock_region_server = mock_region_server();
1884        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1885            MITO_ENGINE_NAME,
1886            Box::new(|region_id, _request| {
1887                if region_id == RegionId::new(1, 1) {
1888                    error::RegionNotFoundSnafu { region_id }.fail()
1889                } else {
1890                    Ok(0)
1891                }
1892            }),
1893        );
1894        mock_region_server.register_engine(engine.clone());
1895
1896        let region_ids = mock_region_server
1897            .handle_batch_open_requests(
1898                8,
1899                vec![
1900                    (
1901                        RegionId::new(1, 1),
1902                        RegionOpenRequest {
1903                            engine: MITO_ENGINE_NAME.to_string(),
1904                            table_dir: String::new(),
1905                            path_type: PathType::Bare,
1906                            options: Default::default(),
1907                            skip_wal_replay: false,
1908                            checkpoint: None,
1909                        },
1910                    ),
1911                    (
1912                        RegionId::new(1, 2),
1913                        RegionOpenRequest {
1914                            engine: MITO_ENGINE_NAME.to_string(),
1915                            table_dir: String::new(),
1916                            path_type: PathType::Bare,
1917                            options: Default::default(),
1918                            skip_wal_replay: false,
1919                            checkpoint: None,
1920                        },
1921                    ),
1922                ],
1923                true,
1924            )
1925            .await
1926            .unwrap();
1927        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1928
1929        let err = mock_region_server
1930            .handle_batch_open_requests(
1931                8,
1932                vec![
1933                    (
1934                        RegionId::new(1, 1),
1935                        RegionOpenRequest {
1936                            engine: MITO_ENGINE_NAME.to_string(),
1937                            table_dir: String::new(),
1938                            path_type: PathType::Bare,
1939                            options: Default::default(),
1940                            skip_wal_replay: false,
1941                            checkpoint: None,
1942                        },
1943                    ),
1944                    (
1945                        RegionId::new(1, 2),
1946                        RegionOpenRequest {
1947                            engine: MITO_ENGINE_NAME.to_string(),
1948                            table_dir: String::new(),
1949                            path_type: PathType::Bare,
1950                            options: Default::default(),
1951                            skip_wal_replay: false,
1952                            checkpoint: None,
1953                        },
1954                    ),
1955                ],
1956                false,
1957            )
1958            .await
1959            .unwrap_err();
1960        assert_eq!(err.status_code(), StatusCode::Unexpected);
1961    }
1962
1963    struct CurrentEngineTest {
1964        region_id: RegionId,
1965        current_region_status: Option<RegionEngineWithStatus>,
1966        region_change: RegionChange,
1967        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1968    }
1969
1970    #[tokio::test]
1971    async fn test_current_engine() {
1972        common_telemetry::init_default_ut_logging();
1973
1974        let mut mock_region_server = mock_region_server();
1975        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1976        mock_region_server.register_engine(engine.clone());
1977
1978        let region_id = RegionId::new(1024, 1);
1979        let tests = vec![
1980            // RegionChange::None
1981            CurrentEngineTest {
1982                region_id,
1983                current_region_status: None,
1984                region_change: RegionChange::None,
1985                assert: Box::new(|result| {
1986                    let err = result.unwrap_err();
1987                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1988                }),
1989            },
1990            CurrentEngineTest {
1991                region_id,
1992                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1993                region_change: RegionChange::None,
1994                assert: Box::new(|result| {
1995                    let current_engine = result.unwrap();
1996                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1997                }),
1998            },
1999            CurrentEngineTest {
2000                region_id,
2001                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2002                region_change: RegionChange::None,
2003                assert: Box::new(|result| {
2004                    let err = result.unwrap_err();
2005                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2006                }),
2007            },
2008            CurrentEngineTest {
2009                region_id,
2010                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2011                region_change: RegionChange::None,
2012                assert: Box::new(|result| {
2013                    let err = result.unwrap_err();
2014                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2015                }),
2016            },
2017            // RegionChange::Register
2018            CurrentEngineTest {
2019                region_id,
2020                current_region_status: None,
2021                region_change: RegionChange::Register(RegionAttribute::Mito),
2022                assert: Box::new(|result| {
2023                    let current_engine = result.unwrap();
2024                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2025                }),
2026            },
2027            CurrentEngineTest {
2028                region_id,
2029                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2030                region_change: RegionChange::Register(RegionAttribute::Mito),
2031                assert: Box::new(|result| {
2032                    let current_engine = result.unwrap();
2033                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2034                }),
2035            },
2036            CurrentEngineTest {
2037                region_id,
2038                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2039                region_change: RegionChange::Register(RegionAttribute::Mito),
2040                assert: Box::new(|result| {
2041                    let err = result.unwrap_err();
2042                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2043                }),
2044            },
2045            CurrentEngineTest {
2046                region_id,
2047                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2048                region_change: RegionChange::Register(RegionAttribute::Mito),
2049                assert: Box::new(|result| {
2050                    let current_engine = result.unwrap();
2051                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2052                }),
2053            },
2054            // RegionChange::Deregister
2055            CurrentEngineTest {
2056                region_id,
2057                current_region_status: None,
2058                region_change: RegionChange::Deregisters,
2059                assert: Box::new(|result| {
2060                    let current_engine = result.unwrap();
2061                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2062                }),
2063            },
2064            CurrentEngineTest {
2065                region_id,
2066                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2067                region_change: RegionChange::Deregisters,
2068                assert: Box::new(|result| {
2069                    let err = result.unwrap_err();
2070                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2071                }),
2072            },
2073            CurrentEngineTest {
2074                region_id,
2075                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2076                region_change: RegionChange::Deregisters,
2077                assert: Box::new(|result| {
2078                    let current_engine = result.unwrap();
2079                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2080                }),
2081            },
2082            CurrentEngineTest {
2083                region_id,
2084                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2085                region_change: RegionChange::Deregisters,
2086                assert: Box::new(|result| {
2087                    let current_engine = result.unwrap();
2088                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2089                }),
2090            },
2091        ];
2092
2093        for test in tests {
2094            let CurrentEngineTest {
2095                region_id,
2096                current_region_status,
2097                region_change,
2098                assert,
2099            } = test;
2100
2101            // Sets up
2102            if let Some(status) = current_region_status {
2103                mock_region_server
2104                    .inner
2105                    .region_map
2106                    .insert(region_id, status);
2107            } else {
2108                mock_region_server.inner.region_map.remove(&region_id);
2109            }
2110
2111            let result = mock_region_server
2112                .inner
2113                .get_engine(region_id, &region_change);
2114
2115            assert(result);
2116        }
2117    }
2118
2119    #[tokio::test]
2120    async fn test_region_server_parallelism() {
2121        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2122        let first_query = p.acquire().await;
2123        assert!(first_query.is_ok());
2124        let second_query = p.acquire().await;
2125        assert!(second_query.is_ok());
2126        let third_query = p.acquire().await;
2127        assert!(third_query.is_err());
2128        let err = third_query.unwrap_err();
2129        assert_eq!(
2130            err.output_msg(),
2131            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2132        );
2133        drop(first_query);
2134        let forth_query = p.acquire().await;
2135        assert!(forth_query.is_ok());
2136    }
2137
2138    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2139        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2140        metadata_builder.push_column_metadata(ColumnMetadata {
2141            column_schema: datatypes::schema::ColumnSchema::new(
2142                "timestamp",
2143                ConcreteDataType::timestamp_nanosecond_datatype(),
2144                false,
2145            ),
2146            semantic_type: SemanticType::Timestamp,
2147            column_id: 0,
2148        });
2149        metadata_builder.push_column_metadata(ColumnMetadata {
2150            column_schema: datatypes::schema::ColumnSchema::new(
2151                "file",
2152                ConcreteDataType::string_datatype(),
2153                true,
2154            ),
2155            semantic_type: SemanticType::Tag,
2156            column_id: 1,
2157        });
2158        metadata_builder.push_column_metadata(ColumnMetadata {
2159            column_schema: datatypes::schema::ColumnSchema::new(
2160                "message",
2161                ConcreteDataType::string_datatype(),
2162                true,
2163            ),
2164            semantic_type: SemanticType::Field,
2165            column_id: 2,
2166        });
2167        metadata_builder.primary_key(vec![1]);
2168        metadata_builder.build().unwrap()
2169    }
2170
2171    #[tokio::test]
2172    async fn test_handle_list_metadata_request() {
2173        common_telemetry::init_default_ut_logging();
2174
2175        let mut mock_region_server = mock_region_server();
2176        let region_id_1 = RegionId::new(1, 0);
2177        let region_id_2 = RegionId::new(2, 0);
2178
2179        let metadata_1 = mock_region_metadata(region_id_1);
2180        let metadata_2 = mock_region_metadata(region_id_2);
2181        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2182
2183        let metadata_1 = Arc::new(metadata_1);
2184        let metadata_2 = Arc::new(metadata_2);
2185        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2186            MITO_ENGINE_NAME,
2187            Box::new(move |region_id| {
2188                if region_id == region_id_1 {
2189                    Ok(metadata_1.clone())
2190                } else if region_id == region_id_2 {
2191                    Ok(metadata_2.clone())
2192                } else {
2193                    error::RegionNotFoundSnafu { region_id }.fail()
2194                }
2195            }),
2196        );
2197
2198        mock_region_server.register_engine(engine.clone());
2199        mock_region_server
2200            .inner
2201            .region_map
2202            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2203        mock_region_server
2204            .inner
2205            .region_map
2206            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2207
2208        // All regions exist.
2209        let list_metadata_request = ListMetadataRequest {
2210            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2211        };
2212        let response = mock_region_server
2213            .handle_list_metadata_request(&list_metadata_request)
2214            .await
2215            .unwrap();
2216        let decoded_metadata: Vec<Option<RegionMetadata>> =
2217            serde_json::from_slice(&response.metadata).unwrap();
2218        assert_eq!(metadatas, decoded_metadata);
2219    }
2220
2221    #[tokio::test]
2222    async fn test_handle_list_metadata_not_found() {
2223        common_telemetry::init_default_ut_logging();
2224
2225        let mut mock_region_server = mock_region_server();
2226        let region_id_1 = RegionId::new(1, 0);
2227        let region_id_2 = RegionId::new(2, 0);
2228
2229        let metadata_1 = mock_region_metadata(region_id_1);
2230        let metadatas = vec![Some(metadata_1.clone()), None];
2231
2232        let metadata_1 = Arc::new(metadata_1);
2233        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2234            MITO_ENGINE_NAME,
2235            Box::new(move |region_id| {
2236                if region_id == region_id_1 {
2237                    Ok(metadata_1.clone())
2238                } else {
2239                    error::RegionNotFoundSnafu { region_id }.fail()
2240                }
2241            }),
2242        );
2243
2244        mock_region_server.register_engine(engine.clone());
2245        mock_region_server
2246            .inner
2247            .region_map
2248            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2249
2250        // Not in region map.
2251        let list_metadata_request = ListMetadataRequest {
2252            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2253        };
2254        let response = mock_region_server
2255            .handle_list_metadata_request(&list_metadata_request)
2256            .await
2257            .unwrap();
2258        let decoded_metadata: Vec<Option<RegionMetadata>> =
2259            serde_json::from_slice(&response.metadata).unwrap();
2260        assert_eq!(metadatas, decoded_metadata);
2261
2262        // Not in region engine.
2263        mock_region_server
2264            .inner
2265            .region_map
2266            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2267        let response = mock_region_server
2268            .handle_list_metadata_request(&list_metadata_request)
2269            .await
2270            .unwrap();
2271        let decoded_metadata: Vec<Option<RegionMetadata>> =
2272            serde_json::from_slice(&response.metadata).unwrap();
2273        assert_eq!(metadatas, decoded_metadata);
2274    }
2275
2276    #[tokio::test]
2277    async fn test_handle_list_metadata_failed() {
2278        common_telemetry::init_default_ut_logging();
2279
2280        let mut mock_region_server = mock_region_server();
2281        let region_id_1 = RegionId::new(1, 0);
2282
2283        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2284            MITO_ENGINE_NAME,
2285            Box::new(move |region_id| {
2286                error::UnexpectedSnafu {
2287                    violated: format!("Failed to get region {region_id}"),
2288                }
2289                .fail()
2290            }),
2291        );
2292
2293        mock_region_server.register_engine(engine.clone());
2294        mock_region_server
2295            .inner
2296            .region_map
2297            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2298
2299        // Failed to get.
2300        let list_metadata_request = ListMetadataRequest {
2301            region_ids: vec![region_id_1.as_u64()],
2302        };
2303        mock_region_server
2304            .handle_list_metadata_request(&list_metadata_request)
2305            .await
2306            .unwrap_err();
2307    }
2308}