datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use either::Either;
48use futures_util::future::try_join_all;
49use metric_engine::engine::MetricEngine;
50use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
51use prost::Message;
52use query::QueryEngineRef;
53pub use query::dummy_catalog::{
54    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
55};
56use serde_json;
57use servers::error::{
58    self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
59};
60use servers::grpc::FlightCompression;
61use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
62use servers::grpc::region_server::RegionServerHandler;
63use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metric_engine_consts::{
66    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
67};
68use store_api::region_engine::{
69    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
70    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
71    SyncRegionFromRequest,
72};
73use store_api::region_request::{
74    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
75    RegionOpenRequest, RegionRequest,
76};
77use store_api::storage::RegionId;
78use tokio::sync::{Semaphore, SemaphorePermit};
79use tokio::time::timeout;
80use tonic::{Request, Response, Result as TonicResult};
81
82use crate::error::{
83    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
84    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
85    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
86    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
87    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
88    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
89};
90use crate::event_listener::RegionServerEventListenerRef;
91use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
92
93#[derive(Clone)]
94pub struct RegionServer {
95    inner: Arc<RegionServerInner>,
96    flight_compression: FlightCompression,
97    suspend: Arc<AtomicBool>,
98}
99
100pub struct RegionStat {
101    pub region_id: RegionId,
102    pub engine: String,
103    pub role: RegionRole,
104}
105
106impl RegionServer {
107    pub fn new(
108        query_engine: QueryEngineRef,
109        runtime: Runtime,
110        event_listener: RegionServerEventListenerRef,
111        flight_compression: FlightCompression,
112    ) -> Self {
113        Self::with_table_provider(
114            query_engine,
115            runtime,
116            event_listener,
117            Arc::new(DummyTableProviderFactory),
118            0,
119            Duration::from_millis(0),
120            flight_compression,
121        )
122    }
123
124    pub fn with_table_provider(
125        query_engine: QueryEngineRef,
126        runtime: Runtime,
127        event_listener: RegionServerEventListenerRef,
128        table_provider_factory: TableProviderFactoryRef,
129        max_concurrent_queries: usize,
130        concurrent_query_limiter_timeout: Duration,
131        flight_compression: FlightCompression,
132    ) -> Self {
133        Self {
134            inner: Arc::new(RegionServerInner::new(
135                query_engine,
136                runtime,
137                event_listener,
138                table_provider_factory,
139                RegionServerParallelism::from_opts(
140                    max_concurrent_queries,
141                    concurrent_query_limiter_timeout,
142                ),
143            )),
144            flight_compression,
145            suspend: Arc::new(AtomicBool::new(false)),
146        }
147    }
148
149    /// Registers an engine.
150    pub fn register_engine(&mut self, engine: RegionEngineRef) {
151        self.inner.register_engine(engine);
152    }
153
154    /// Sets the topic stats.
155    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
156        self.inner.set_topic_stats_reporter(topic_stats_reporter);
157    }
158
159    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
160    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
161        match self.inner.get_engine(region_id, &RegionChange::None) {
162            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
163            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
164            Err(error::Error::RegionNotFound { .. }) => Ok(None),
165            Err(err) => Err(err),
166        }
167    }
168
169    /// Gets the MitoEngine if it's registered.
170    pub fn mito_engine(&self) -> Option<MitoEngine> {
171        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
172            Some(mito)
173        } else {
174            self.inner
175                .engines
176                .read()
177                .unwrap()
178                .get(MITO_ENGINE_NAME)
179                .cloned()
180                .and_then(|e| {
181                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
182                    if mito.is_none() {
183                        warn!("Mito engine not found in region server engines");
184                    }
185                    mito
186                })
187        }
188    }
189
190    #[tracing::instrument(skip_all)]
191    pub async fn handle_batch_open_requests(
192        &self,
193        parallelism: usize,
194        requests: Vec<(RegionId, RegionOpenRequest)>,
195        ignore_nonexistent_region: bool,
196    ) -> Result<Vec<RegionId>> {
197        self.inner
198            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
199            .await
200    }
201
202    #[tracing::instrument(skip_all)]
203    pub async fn handle_batch_catchup_requests(
204        &self,
205        parallelism: usize,
206        requests: Vec<(RegionId, RegionCatchupRequest)>,
207    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
208        self.inner
209            .handle_batch_catchup_requests(parallelism, requests)
210            .await
211    }
212
213    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
214    pub async fn handle_request(
215        &self,
216        region_id: RegionId,
217        request: RegionRequest,
218    ) -> Result<RegionResponse> {
219        self.inner.handle_request(region_id, request).await
220    }
221
222    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
223    async fn table_provider(
224        &self,
225        region_id: RegionId,
226        ctx: Option<QueryContextRef>,
227    ) -> Result<Arc<dyn TableProvider>> {
228        let status = self
229            .inner
230            .region_map
231            .get(&region_id)
232            .context(RegionNotFoundSnafu { region_id })?
233            .clone();
234        ensure!(
235            matches!(status, RegionEngineWithStatus::Ready(_)),
236            RegionNotReadySnafu { region_id }
237        );
238
239        self.inner
240            .table_provider_factory
241            .create(region_id, status.into_engine(), ctx)
242            .await
243            .context(ExecuteLogicalPlanSnafu)
244    }
245
246    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
247    pub async fn handle_remote_read(
248        &self,
249        request: api::v1::region::QueryRequest,
250        query_ctx: QueryContextRef,
251    ) -> Result<SendableRecordBatchStream> {
252        let _permit = if let Some(p) = &self.inner.parallelism {
253            Some(p.acquire().await?)
254        } else {
255            None
256        };
257
258        let region_id = RegionId::from_u64(request.region_id);
259        let catalog_list = Arc::new(NameAwareCatalogList::new(
260            self.clone(),
261            region_id,
262            query_ctx.clone(),
263        ));
264
265        if query_ctx.explain_verbose() {
266            common_telemetry::info!("Handle remote read for region: {}", region_id);
267        }
268
269        let decoder = self
270            .inner
271            .query_engine
272            .engine_context(query_ctx.clone())
273            .new_plan_decoder()
274            .context(NewPlanDecoderSnafu)?;
275
276        let plan = decoder
277            .decode(Bytes::from(request.plan), catalog_list, false)
278            .await
279            .context(DecodeLogicalPlanSnafu)?;
280
281        self.inner
282            .handle_read(
283                QueryRequest {
284                    header: request.header,
285                    region_id,
286                    plan,
287                },
288                query_ctx,
289            )
290            .await
291    }
292
293    #[tracing::instrument(skip_all)]
294    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
295        let _permit = if let Some(p) = &self.inner.parallelism {
296            Some(p.acquire().await?)
297        } else {
298            None
299        };
300
301        let ctx = request.header.as_ref().map(|h| h.into());
302        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
303
304        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
305            .context(DataFusionSnafu)?;
306        let mut injector = injector_builder
307            .build(self, request.region_id, query_ctx.clone())
308            .await?;
309
310        let plan = request
311            .plan
312            .rewrite(&mut injector)
313            .context(DataFusionSnafu)?
314            .data;
315
316        self.inner
317            .handle_read(QueryRequest { plan, ..request }, query_ctx)
318            .await
319    }
320
321    /// Returns all opened and reportable regions.
322    ///
323    /// Notes: except all metrics regions.
324    pub fn reportable_regions(&self) -> Vec<RegionStat> {
325        self.inner
326            .region_map
327            .iter()
328            .filter_map(|e| {
329                let region_id = *e.key();
330                // Filters out any regions whose role equals None.
331                e.role(region_id).map(|role| RegionStat {
332                    region_id,
333                    engine: e.value().name().to_string(),
334                    role,
335                })
336            })
337            .collect()
338    }
339
340    /// Returns the reportable topics.
341    pub fn topic_stats(&self) -> Vec<TopicStat> {
342        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
343        let Some(reporter) = reporter.as_mut() else {
344            return vec![];
345        };
346        reporter
347            .reportable_topics()
348            .into_iter()
349            .map(|stat| TopicStat {
350                topic_name: stat.topic,
351                record_size: stat.record_size,
352                record_num: stat.record_num,
353                latest_entry_id: stat.latest_entry_id,
354            })
355            .collect()
356    }
357
358    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
359        self.inner.region_map.get(&region_id).and_then(|engine| {
360            engine.role(region_id).map(|role| match role {
361                RegionRole::Follower => false,
362                RegionRole::Leader => true,
363                RegionRole::DowngradingLeader => true,
364            })
365        })
366    }
367
368    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
369        let engine = self
370            .inner
371            .region_map
372            .get(&region_id)
373            .with_context(|| RegionNotFoundSnafu { region_id })?;
374        engine
375            .set_region_role(region_id, role)
376            .with_context(|_| HandleRegionRequestSnafu { region_id })
377    }
378
379    /// Set region role state gracefully.
380    ///
381    /// For [SettableRegionRoleState::Follower]:
382    /// After the call returns, the engine ensures that
383    /// no **further** write or flush operations will succeed in this region.
384    ///
385    /// For [SettableRegionRoleState::DowngradingLeader]:
386    /// After the call returns, the engine ensures that
387    /// no **further** write operations will succeed in this region.
388    pub async fn set_region_role_state_gracefully(
389        &self,
390        region_id: RegionId,
391        state: SettableRegionRoleState,
392    ) -> Result<SetRegionRoleStateResponse> {
393        match self.inner.region_map.get(&region_id) {
394            Some(engine) => Ok(engine
395                .set_region_role_state_gracefully(region_id, state)
396                .await
397                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
398            None => Ok(SetRegionRoleStateResponse::NotFound),
399        }
400    }
401
402    pub fn runtime(&self) -> Runtime {
403        self.inner.runtime.clone()
404    }
405
406    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
407        match self.inner.region_map.get(&region_id) {
408            Some(e) => e.region_statistic(region_id),
409            None => None,
410        }
411    }
412
413    /// Stop the region server.
414    pub async fn stop(&self) -> Result<()> {
415        self.inner.stop().await
416    }
417
418    #[cfg(test)]
419    /// Registers a region for test purpose.
420    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
421        {
422            let mut engines = self.inner.engines.write().unwrap();
423            if !engines.contains_key(engine.name()) {
424                debug!("Registering test engine: {}", engine.name());
425                engines.insert(engine.name().to_string(), engine.clone());
426            }
427        }
428
429        self.inner
430            .region_map
431            .insert(region_id, RegionEngineWithStatus::Ready(engine));
432    }
433
434    async fn handle_batch_ddl_requests(
435        &self,
436        request: region_request::Body,
437    ) -> Result<RegionResponse> {
438        // Safety: we have already checked the request type in `RegionServer::handle()`.
439        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
440            .context(BuildRegionRequestsSnafu)?
441            .unwrap();
442        let tracing_context = TracingContext::from_current_span();
443
444        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
445        self.inner
446            .handle_batch_request(batch_request)
447            .trace(span)
448            .await
449    }
450
451    async fn handle_requests_in_parallel(
452        &self,
453        request: region_request::Body,
454    ) -> Result<RegionResponse> {
455        let requests =
456            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
457
458        // Try to optimize batch Put requests for metric engine
459        // Returns either Some(response) or None(requests_back)
460        match self.try_handle_metric_batch_puts(requests).await? {
461            Either::Left(response) => Ok(response),
462            Either::Right(requests) => {
463                // Fallback: original parallel processing
464                let tracing_context = TracingContext::from_current_span();
465                let join_tasks =
466                    requests
467                        .into_iter()
468                        .map(|(region_id, req): (RegionId, RegionRequest)| {
469                            let self_to_move = self;
470                            let span = tracing_context.attach(info_span!(
471                                "RegionServer::handle_region_request",
472                                region_id = region_id.to_string()
473                            ));
474                            async move {
475                                self_to_move
476                                    .handle_request(region_id, req)
477                                    .trace(span)
478                                    .await
479                            }
480                        });
481
482                let results = try_join_all(join_tasks).await?;
483                let mut affected_rows = 0;
484                let mut extensions = HashMap::new();
485                for result in results {
486                    affected_rows += result.affected_rows;
487                    extensions.extend(result.extensions);
488                }
489
490                Ok(RegionResponse {
491                    affected_rows,
492                    extensions,
493                    metadata: Vec::new(),
494                })
495            }
496        }
497    }
498
499    async fn handle_requests_in_serial(
500        &self,
501        request: region_request::Body,
502    ) -> Result<RegionResponse> {
503        let requests =
504            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
505        let tracing_context = TracingContext::from_current_span();
506
507        let mut affected_rows = 0;
508        let mut extensions = HashMap::new();
509        for (region_id, req) in requests {
510            let span = tracing_context.attach(info_span!(
511                "RegionServer::handle_region_request",
512                region_id = region_id.to_string()
513            ));
514            let result = self.handle_request(region_id, req).trace(span).await?;
515
516            affected_rows += result.affected_rows;
517            extensions.extend(result.extensions);
518        }
519
520        Ok(RegionResponse {
521            affected_rows,
522            extensions,
523            metadata: Vec::new(),
524        })
525    }
526
527    /// Attempts to optimize batch Put requests for metric engine.
528    ///
529    /// Returns Either::Left(response) if optimization succeeded,
530    /// or Either::Right(original_requests) to fall back to parallel processing.
531    ///
532    /// This avoids cloning requests when optimization cannot be applied.
533    async fn try_handle_metric_batch_puts(
534        &self,
535        requests: Vec<(RegionId, RegionRequest)>,
536    ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
537        if requests.is_empty() {
538            return Ok(Either::Right(requests));
539        }
540
541        // Quick check: verify first request is Put and is metric engine
542        if !matches!(requests[0].1, RegionRequest::Put(_)) {
543            return Ok(Either::Right(requests));
544        }
545        let first_region_id = requests[0].0;
546        let request_type = requests[0].1.request_type();
547
548        // SAFETY: If the first request belongs to metric engine, then ALL requests
549        // in this batch are guaranteed to belong to metric engine. This invariant
550        // is maintained by the request batching logic upstream.
551        let engine = match self
552            .inner
553            .get_engine(first_region_id, &RegionChange::None)?
554        {
555            CurrentEngine::Engine(e) => e,
556            _ => return Ok(Either::Right(requests)),
557        };
558
559        if engine.name() != METRIC_ENGINE_NAME {
560            return Ok(Either::Right(requests));
561        }
562
563        // Check if ALL requests are Put (now we know it's worth checking)
564        let mut all_puts = true;
565        for (_, req) in &requests {
566            if !matches!(req, RegionRequest::Put(_)) {
567                all_puts = false;
568                break;
569            }
570        }
571
572        if !all_puts {
573            return Ok(Either::Right(requests));
574        }
575
576        // Now extract Put requests by consuming ownership (zero clone!)
577        let put_requests = requests.into_iter().map(|(region_id, req)| {
578            if let RegionRequest::Put(put) = req {
579                (region_id, put)
580            } else {
581                unreachable!("Already checked all are Put")
582            }
583        });
584
585        // Downcast to MetricEngine and call batch API
586        let metric_engine =
587            engine
588                .as_any()
589                .downcast_ref::<MetricEngine>()
590                .context(UnexpectedSnafu {
591                    violated: "Failed to downcast to MetricEngine",
592                })?;
593
594        let tracing_context = TracingContext::from_current_span();
595        let batch_size = put_requests.len();
596        let span = tracing_context.attach(info_span!(
597            "RegionServer::handle_metric_batch_puts",
598            batch_size = batch_size,
599        ));
600        let result = metric_engine
601            .put_regions_batch(put_requests)
602            .trace(span)
603            .await
604            .map_err(BoxedError::new)
605            .context(HandleRegionRequestSnafu {
606                region_id: first_region_id,
607            });
608
609        match result {
610            Ok(total_affected) => {
611                crate::metrics::REGION_CHANGED_ROW_COUNT
612                    .with_label_values(&[request_type])
613                    .inc_by(total_affected as u64);
614                Ok(Either::Left(RegionResponse::new(total_affected)))
615            }
616            Err(err) => {
617                crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
618                    .with_label_values(&[request_type])
619                    .inc_by(batch_size as u64);
620                Err(err)
621            }
622        }
623    }
624
625    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
626        let region_id = RegionId::from_u64(request.region_id);
627        let manifest_info = request
628            .manifest_info
629            .context(error::MissingRequiredFieldSnafu {
630                name: "manifest_info",
631            })?;
632
633        let manifest_info = match manifest_info {
634            ManifestInfo::MitoManifestInfo(info) => {
635                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
636            }
637            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
638                info.data_manifest_version,
639                0,
640                info.metadata_manifest_version,
641                0,
642            ),
643        };
644
645        let tracing_context = TracingContext::from_current_span();
646        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
647
648        self.sync_region(
649            region_id,
650            SyncRegionFromRequest::from_manifest(manifest_info),
651        )
652        .trace(span)
653        .await
654        .map(|_| RegionResponse::new(AffectedRows::default()))
655    }
656
657    /// Handles the ListMetadata request and retrieves metadata for specified regions.
658    ///
659    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
660    /// non-existing regions as `null`.
661    #[tracing::instrument(skip_all)]
662    async fn handle_list_metadata_request(
663        &self,
664        request: &ListMetadataRequest,
665    ) -> Result<RegionResponse> {
666        let mut region_metadatas = Vec::new();
667        // Collect metadata for each region
668        for region_id in &request.region_ids {
669            let region_id = RegionId::from_u64(*region_id);
670            // Get the engine.
671            let Some(engine) = self.find_engine(region_id)? else {
672                region_metadatas.push(None);
673                continue;
674            };
675
676            match engine.get_metadata(region_id).await {
677                Ok(metadata) => region_metadatas.push(Some(metadata)),
678                Err(err) => {
679                    if err.status_code() == StatusCode::RegionNotFound {
680                        region_metadatas.push(None);
681                    } else {
682                        Err(err).with_context(|_| GetRegionMetadataSnafu {
683                            engine: engine.name(),
684                            region_id,
685                        })?;
686                    }
687                }
688            }
689        }
690
691        // Serialize metadata to JSON
692        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
693
694        let response = RegionResponse::from_metadata(json_result);
695
696        Ok(response)
697    }
698
699    /// Sync region manifest and registers new opened logical regions.
700    pub async fn sync_region(
701        &self,
702        region_id: RegionId,
703        request: SyncRegionFromRequest,
704    ) -> Result<()> {
705        let engine_with_status = self
706            .inner
707            .region_map
708            .get(&region_id)
709            .with_context(|| RegionNotFoundSnafu { region_id })?;
710
711        self.inner
712            .handle_sync_region(engine_with_status.engine(), region_id, request)
713            .await
714    }
715
716    /// Remaps manifests from old regions to new regions.
717    pub async fn remap_manifests(
718        &self,
719        request: RemapManifestsRequest,
720    ) -> Result<RemapManifestsResponse> {
721        let region_id = request.region_id;
722        let engine_with_status = self
723            .inner
724            .region_map
725            .get(&region_id)
726            .with_context(|| RegionNotFoundSnafu { region_id })?;
727
728        engine_with_status
729            .engine()
730            .remap_manifests(request)
731            .await
732            .with_context(|_| HandleRegionRequestSnafu { region_id })
733    }
734
735    fn is_suspended(&self) -> bool {
736        self.suspend.load(Ordering::Relaxed)
737    }
738
739    pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
740        self.suspend.clone()
741    }
742}
743
744#[async_trait]
745impl RegionServerHandler for RegionServer {
746    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
747        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
748            .with_label_values(&[request.as_ref()]);
749        let response = match &request {
750            region_request::Body::Creates(_)
751            | region_request::Body::Drops(_)
752            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
753            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
754                self.handle_requests_in_parallel(request).await
755            }
756            region_request::Body::Sync(sync_request) => {
757                self.handle_sync_region_request(sync_request).await
758            }
759            region_request::Body::ListMetadata(list_metadata_request) => {
760                self.handle_list_metadata_request(list_metadata_request)
761                    .await
762            }
763            _ => self.handle_requests_in_serial(request).await,
764        }
765        .map_err(BoxedError::new)
766        .inspect_err(|_| {
767            failed_requests_cnt.inc();
768        })
769        .context(ExecuteGrpcRequestSnafu)?;
770
771        Ok(RegionResponseV1 {
772            header: Some(ResponseHeader {
773                status: Some(Status {
774                    status_code: StatusCode::Success as _,
775                    ..Default::default()
776                }),
777            }),
778            affected_rows: response.affected_rows as _,
779            extensions: response.extensions,
780            metadata: response.metadata,
781        })
782    }
783}
784
785#[async_trait]
786impl FlightCraft for RegionServer {
787    async fn do_get(
788        &self,
789        request: Request<Ticket>,
790    ) -> TonicResult<Response<TonicStream<FlightData>>> {
791        ensure!(!self.is_suspended(), SuspendedSnafu);
792
793        let ticket = request.into_inner().ticket;
794        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
795            .context(servers_error::InvalidFlightTicketSnafu)?;
796        let tracing_context = request
797            .header
798            .as_ref()
799            .map(|h| TracingContext::from_w3c(&h.tracing_context))
800            .unwrap_or_default();
801        let query_ctx = request
802            .header
803            .as_ref()
804            .map(|h| Arc::new(QueryContext::from(h)))
805            .unwrap_or(QueryContext::arc());
806
807        let result = self
808            .handle_remote_read(request, query_ctx.clone())
809            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
810            .await?;
811
812        let stream = Box::pin(FlightRecordBatchStream::new(
813            result,
814            tracing_context,
815            self.flight_compression,
816            query_ctx,
817        ));
818        Ok(Response::new(stream))
819    }
820}
821
822#[derive(Clone)]
823enum RegionEngineWithStatus {
824    // An opening, or creating region.
825    Registering(RegionEngineRef),
826    // A closing, or dropping region.
827    Deregistering(RegionEngineRef),
828    // A ready region.
829    Ready(RegionEngineRef),
830}
831
832impl RegionEngineWithStatus {
833    /// Returns [RegionEngineRef].
834    pub fn into_engine(self) -> RegionEngineRef {
835        match self {
836            RegionEngineWithStatus::Registering(engine) => engine,
837            RegionEngineWithStatus::Deregistering(engine) => engine,
838            RegionEngineWithStatus::Ready(engine) => engine,
839        }
840    }
841
842    /// Returns [RegionEngineRef] reference.
843    pub fn engine(&self) -> &RegionEngineRef {
844        match self {
845            RegionEngineWithStatus::Registering(engine) => engine,
846            RegionEngineWithStatus::Deregistering(engine) => engine,
847            RegionEngineWithStatus::Ready(engine) => engine,
848        }
849    }
850}
851
852impl Deref for RegionEngineWithStatus {
853    type Target = RegionEngineRef;
854
855    fn deref(&self) -> &Self::Target {
856        match self {
857            RegionEngineWithStatus::Registering(engine) => engine,
858            RegionEngineWithStatus::Deregistering(engine) => engine,
859            RegionEngineWithStatus::Ready(engine) => engine,
860        }
861    }
862}
863
864struct RegionServerInner {
865    engines: RwLock<HashMap<String, RegionEngineRef>>,
866    region_map: DashMap<RegionId, RegionEngineWithStatus>,
867    query_engine: QueryEngineRef,
868    runtime: Runtime,
869    event_listener: RegionServerEventListenerRef,
870    table_provider_factory: TableProviderFactoryRef,
871    /// The number of queries allowed to be executed at the same time.
872    /// Act as last line of defense on datanode to prevent query overloading.
873    parallelism: Option<RegionServerParallelism>,
874    /// The topic stats reporter.
875    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
876    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
877    /// server with a concrete engine; acceptable for now to fetch Mito-specific
878    /// info (e.g., list SSTs). Consider a diagnostics trait later.
879    mito_engine: RwLock<Option<MitoEngine>>,
880}
881
882struct RegionServerParallelism {
883    semaphore: Semaphore,
884    timeout: Duration,
885}
886
887impl RegionServerParallelism {
888    pub fn from_opts(
889        max_concurrent_queries: usize,
890        concurrent_query_limiter_timeout: Duration,
891    ) -> Option<Self> {
892        if max_concurrent_queries == 0 {
893            return None;
894        }
895        Some(RegionServerParallelism {
896            semaphore: Semaphore::new(max_concurrent_queries),
897            timeout: concurrent_query_limiter_timeout,
898        })
899    }
900
901    pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
902        timeout(self.timeout, self.semaphore.acquire())
903            .await
904            .context(ConcurrentQueryLimiterTimeoutSnafu)?
905            .context(ConcurrentQueryLimiterClosedSnafu)
906    }
907}
908
909enum CurrentEngine {
910    Engine(RegionEngineRef),
911    EarlyReturn(AffectedRows),
912}
913
914impl Debug for CurrentEngine {
915    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
916        match self {
917            CurrentEngine::Engine(engine) => f
918                .debug_struct("CurrentEngine")
919                .field("engine", &engine.name())
920                .finish(),
921            CurrentEngine::EarlyReturn(rows) => f
922                .debug_struct("CurrentEngine")
923                .field("return", rows)
924                .finish(),
925        }
926    }
927}
928
929impl RegionServerInner {
930    pub fn new(
931        query_engine: QueryEngineRef,
932        runtime: Runtime,
933        event_listener: RegionServerEventListenerRef,
934        table_provider_factory: TableProviderFactoryRef,
935        parallelism: Option<RegionServerParallelism>,
936    ) -> Self {
937        Self {
938            engines: RwLock::new(HashMap::new()),
939            region_map: DashMap::new(),
940            query_engine,
941            runtime,
942            event_listener,
943            table_provider_factory,
944            parallelism,
945            topic_stats_reporter: RwLock::new(None),
946            mito_engine: RwLock::new(None),
947        }
948    }
949
950    pub fn register_engine(&self, engine: RegionEngineRef) {
951        let engine_name = engine.name();
952        if engine_name == MITO_ENGINE_NAME
953            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
954        {
955            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
956        }
957
958        info!("Region Engine {engine_name} is registered");
959        self.engines
960            .write()
961            .unwrap()
962            .insert(engine_name.to_string(), engine);
963    }
964
965    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
966        info!("Set topic stats reporter");
967        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
968    }
969
970    fn get_engine(
971        &self,
972        region_id: RegionId,
973        region_change: &RegionChange,
974    ) -> Result<CurrentEngine> {
975        let current_region_status = self.region_map.get(&region_id);
976
977        let engine = match region_change {
978            RegionChange::Register(attribute) => match current_region_status {
979                Some(status) => match status.clone() {
980                    RegionEngineWithStatus::Registering(engine) => engine,
981                    RegionEngineWithStatus::Deregistering(_) => {
982                        return error::RegionBusySnafu { region_id }.fail();
983                    }
984                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
985                },
986                _ => self
987                    .engines
988                    .read()
989                    .unwrap()
990                    .get(attribute.engine())
991                    .with_context(|| RegionEngineNotFoundSnafu {
992                        name: attribute.engine(),
993                    })?
994                    .clone(),
995            },
996            RegionChange::Deregisters => match current_region_status {
997                Some(status) => match status.clone() {
998                    RegionEngineWithStatus::Registering(_) => {
999                        return error::RegionBusySnafu { region_id }.fail();
1000                    }
1001                    RegionEngineWithStatus::Deregistering(_) => {
1002                        return Ok(CurrentEngine::EarlyReturn(0));
1003                    }
1004                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1005                },
1006                None => return Ok(CurrentEngine::EarlyReturn(0)),
1007            },
1008            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1009                match current_region_status {
1010                    Some(status) => match status.clone() {
1011                        RegionEngineWithStatus::Registering(_) => {
1012                            return error::RegionNotReadySnafu { region_id }.fail();
1013                        }
1014                        RegionEngineWithStatus::Deregistering(_) => {
1015                            return error::RegionNotFoundSnafu { region_id }.fail();
1016                        }
1017                        RegionEngineWithStatus::Ready(engine) => engine,
1018                    },
1019                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
1020                }
1021            }
1022        };
1023
1024        Ok(CurrentEngine::Engine(engine))
1025    }
1026
1027    async fn handle_batch_open_requests_inner(
1028        &self,
1029        engine: RegionEngineRef,
1030        parallelism: usize,
1031        requests: Vec<(RegionId, RegionOpenRequest)>,
1032        ignore_nonexistent_region: bool,
1033    ) -> Result<Vec<RegionId>> {
1034        let region_changes = requests
1035            .iter()
1036            .map(|(region_id, open)| {
1037                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1038                Ok((*region_id, RegionChange::Register(attribute)))
1039            })
1040            .collect::<Result<HashMap<_, _>>>()?;
1041
1042        for (&region_id, region_change) in &region_changes {
1043            self.set_region_status_not_ready(region_id, &engine, region_change)
1044        }
1045
1046        let mut open_regions = Vec::with_capacity(requests.len());
1047        let mut errors = vec![];
1048        match engine
1049            .handle_batch_open_requests(parallelism, requests)
1050            .await
1051            .with_context(|_| HandleBatchOpenRequestSnafu)
1052        {
1053            Ok(results) => {
1054                for (region_id, result) in results {
1055                    let region_change = &region_changes[&region_id];
1056                    match result {
1057                        Ok(_) => {
1058                            if let Err(e) = self
1059                                .set_region_status_ready(region_id, engine.clone(), *region_change)
1060                                .await
1061                            {
1062                                error!(e; "Failed to set region to ready: {}", region_id);
1063                                errors.push(BoxedError::new(e));
1064                            } else {
1065                                open_regions.push(region_id)
1066                            }
1067                        }
1068                        Err(e) => {
1069                            self.unset_region_status(region_id, &engine, *region_change);
1070                            if e.status_code() == StatusCode::RegionNotFound
1071                                && ignore_nonexistent_region
1072                            {
1073                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1074                            } else {
1075                                error!(e; "Failed to open region: {}", region_id);
1076                                errors.push(e);
1077                            }
1078                        }
1079                    }
1080                }
1081            }
1082            Err(e) => {
1083                for (&region_id, region_change) in &region_changes {
1084                    self.unset_region_status(region_id, &engine, *region_change);
1085                }
1086                error!(e; "Failed to open batch regions");
1087                errors.push(BoxedError::new(e));
1088            }
1089        }
1090
1091        if !errors.is_empty() {
1092            return error::UnexpectedSnafu {
1093                // Returns the first error.
1094                violated: format!("Failed to open batch regions: {:?}", errors[0]),
1095            }
1096            .fail();
1097        }
1098
1099        Ok(open_regions)
1100    }
1101
1102    pub async fn handle_batch_open_requests(
1103        &self,
1104        parallelism: usize,
1105        requests: Vec<(RegionId, RegionOpenRequest)>,
1106        ignore_nonexistent_region: bool,
1107    ) -> Result<Vec<RegionId>> {
1108        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1109            HashMap::with_capacity(requests.len());
1110        for (region_id, request) in requests {
1111            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1112                requests.push((region_id, request));
1113            } else {
1114                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1115            }
1116        }
1117
1118        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1119        for (engine, requests) in engine_grouped_requests {
1120            let engine = self
1121                .engines
1122                .read()
1123                .unwrap()
1124                .get(&engine)
1125                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1126                .clone();
1127            results.push(
1128                self.handle_batch_open_requests_inner(
1129                    engine,
1130                    parallelism,
1131                    requests,
1132                    ignore_nonexistent_region,
1133                )
1134                .await,
1135            )
1136        }
1137
1138        Ok(results
1139            .into_iter()
1140            .collect::<Result<Vec<_>>>()?
1141            .into_iter()
1142            .flatten()
1143            .collect::<Vec<_>>())
1144    }
1145
1146    pub async fn handle_batch_catchup_requests_inner(
1147        &self,
1148        engine: RegionEngineRef,
1149        parallelism: usize,
1150        requests: Vec<(RegionId, RegionCatchupRequest)>,
1151    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1152        for (region_id, _) in &requests {
1153            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1154        }
1155        let region_ids = requests
1156            .iter()
1157            .map(|(region_id, _)| *region_id)
1158            .collect::<Vec<_>>();
1159        let mut responses = Vec::with_capacity(requests.len());
1160        match engine
1161            .handle_batch_catchup_requests(parallelism, requests)
1162            .await
1163        {
1164            Ok(results) => {
1165                for (region_id, result) in results {
1166                    match result {
1167                        Ok(_) => {
1168                            if let Err(e) = self
1169                                .set_region_status_ready(
1170                                    region_id,
1171                                    engine.clone(),
1172                                    RegionChange::Catchup,
1173                                )
1174                                .await
1175                            {
1176                                error!(e; "Failed to set region to ready: {}", region_id);
1177                                responses.push((region_id, Err(BoxedError::new(e))));
1178                            } else {
1179                                responses.push((region_id, Ok(())));
1180                            }
1181                        }
1182                        Err(e) => {
1183                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1184                            error!(e; "Failed to catchup region: {}", region_id);
1185                            responses.push((region_id, Err(e)));
1186                        }
1187                    }
1188                }
1189            }
1190            Err(e) => {
1191                for region_id in region_ids {
1192                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1193                }
1194                error!(e; "Failed to catchup batch regions");
1195                return error::UnexpectedSnafu {
1196                    violated: format!("Failed to catchup batch regions: {:?}", e),
1197                }
1198                .fail();
1199            }
1200        }
1201
1202        Ok(responses)
1203    }
1204
1205    pub async fn handle_batch_catchup_requests(
1206        &self,
1207        parallelism: usize,
1208        requests: Vec<(RegionId, RegionCatchupRequest)>,
1209    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1210        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1211
1212        let mut responses = Vec::with_capacity(requests.len());
1213        for (region_id, request) in requests {
1214            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1215                match engine {
1216                    CurrentEngine::Engine(engine) => {
1217                        engine_grouped_requests
1218                            .entry(engine.name().to_string())
1219                            .or_default()
1220                            .push((region_id, request));
1221                    }
1222                    CurrentEngine::EarlyReturn(_) => {
1223                        return error::UnexpectedSnafu {
1224                            violated: format!("Unexpected engine type for region {}", region_id),
1225                        }
1226                        .fail();
1227                    }
1228                }
1229            } else {
1230                responses.push((
1231                    region_id,
1232                    Err(BoxedError::new(
1233                        error::RegionNotFoundSnafu { region_id }.build(),
1234                    )),
1235                ));
1236            }
1237        }
1238
1239        for (engine, requests) in engine_grouped_requests {
1240            let engine = self
1241                .engines
1242                .read()
1243                .unwrap()
1244                .get(&engine)
1245                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1246                .clone();
1247            responses.extend(
1248                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1249                    .await?,
1250            );
1251        }
1252
1253        Ok(responses)
1254    }
1255
1256    // Handle requests in batch.
1257    //
1258    // limitation: all create requests must be in the same engine.
1259    pub async fn handle_batch_request(
1260        &self,
1261        batch_request: BatchRegionDdlRequest,
1262    ) -> Result<RegionResponse> {
1263        let region_changes = match &batch_request {
1264            BatchRegionDdlRequest::Create(requests) => requests
1265                .iter()
1266                .map(|(region_id, create)| {
1267                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1268                    Ok((*region_id, RegionChange::Register(attribute)))
1269                })
1270                .collect::<Result<Vec<_>>>()?,
1271            BatchRegionDdlRequest::Drop(requests) => requests
1272                .iter()
1273                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1274                .collect::<Vec<_>>(),
1275            BatchRegionDdlRequest::Alter(requests) => requests
1276                .iter()
1277                .map(|(region_id, _)| (*region_id, RegionChange::None))
1278                .collect::<Vec<_>>(),
1279        };
1280
1281        // The ddl procedure will ensure all requests are in the same engine.
1282        // Therefore, we can get the engine from the first request.
1283        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1284        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1285            CurrentEngine::Engine(engine) => engine,
1286            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1287        };
1288
1289        for (region_id, region_change) in region_changes.iter() {
1290            self.set_region_status_not_ready(*region_id, &engine, region_change);
1291        }
1292
1293        let ddl_type = batch_request.request_type();
1294        let result = engine
1295            .handle_batch_ddl_requests(batch_request)
1296            .await
1297            .context(HandleBatchDdlRequestSnafu { ddl_type });
1298
1299        match result {
1300            Ok(result) => {
1301                for (region_id, region_change) in &region_changes {
1302                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1303                        .await?;
1304                }
1305
1306                Ok(RegionResponse {
1307                    affected_rows: result.affected_rows,
1308                    extensions: result.extensions,
1309                    metadata: Vec::new(),
1310                })
1311            }
1312            Err(err) => {
1313                for (region_id, region_change) in region_changes {
1314                    self.unset_region_status(region_id, &engine, region_change);
1315                }
1316
1317                Err(err)
1318            }
1319        }
1320    }
1321
1322    pub async fn handle_request(
1323        &self,
1324        region_id: RegionId,
1325        request: RegionRequest,
1326    ) -> Result<RegionResponse> {
1327        let request_type = request.request_type();
1328        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1329            .with_label_values(&[request_type])
1330            .start_timer();
1331
1332        let region_change = match &request {
1333            RegionRequest::Create(create) => {
1334                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1335                RegionChange::Register(attribute)
1336            }
1337            RegionRequest::Open(open) => {
1338                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1339                RegionChange::Register(attribute)
1340            }
1341            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1342            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1343                RegionChange::Ingest
1344            }
1345            RegionRequest::Alter(_)
1346            | RegionRequest::Flush(_)
1347            | RegionRequest::Compact(_)
1348            | RegionRequest::Truncate(_)
1349            | RegionRequest::BuildIndex(_)
1350            | RegionRequest::EnterStaging(_)
1351            | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1352            RegionRequest::Catchup(_) => RegionChange::Catchup,
1353        };
1354
1355        let engine = match self.get_engine(region_id, &region_change)? {
1356            CurrentEngine::Engine(engine) => engine,
1357            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1358        };
1359
1360        // Sets corresponding region status to registering/deregistering before the operation.
1361        self.set_region_status_not_ready(region_id, &engine, &region_change);
1362
1363        match engine
1364            .handle_request(region_id, request)
1365            .await
1366            .with_context(|_| HandleRegionRequestSnafu { region_id })
1367        {
1368            Ok(result) => {
1369                // Update metrics
1370                if matches!(region_change, RegionChange::Ingest) {
1371                    crate::metrics::REGION_CHANGED_ROW_COUNT
1372                        .with_label_values(&[request_type])
1373                        .inc_by(result.affected_rows as u64);
1374                }
1375                // Sets corresponding region status to ready.
1376                self.set_region_status_ready(region_id, engine.clone(), region_change)
1377                    .await?;
1378
1379                Ok(RegionResponse {
1380                    affected_rows: result.affected_rows,
1381                    extensions: result.extensions,
1382                    metadata: Vec::new(),
1383                })
1384            }
1385            Err(err) => {
1386                if matches!(region_change, RegionChange::Ingest) {
1387                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1388                        .with_label_values(&[request_type])
1389                        .inc();
1390                }
1391                // Removes the region status if the operation fails.
1392                self.unset_region_status(region_id, &engine, region_change);
1393                Err(err)
1394            }
1395        }
1396    }
1397
1398    /// Handles the sync region request.
1399    pub async fn handle_sync_region(
1400        &self,
1401        engine: &RegionEngineRef,
1402        region_id: RegionId,
1403        request: SyncRegionFromRequest,
1404    ) -> Result<()> {
1405        let Some(new_opened_regions) = engine
1406            .sync_region(region_id, request)
1407            .await
1408            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1409            .new_opened_logical_region_ids()
1410        else {
1411            return Ok(());
1412        };
1413
1414        for region in new_opened_regions {
1415            self.region_map
1416                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1417            info!("Logical region {} is registered!", region);
1418        }
1419
1420        Ok(())
1421    }
1422
1423    fn set_region_status_not_ready(
1424        &self,
1425        region_id: RegionId,
1426        engine: &RegionEngineRef,
1427        region_change: &RegionChange,
1428    ) {
1429        match region_change {
1430            RegionChange::Register(_) => {
1431                self.region_map.insert(
1432                    region_id,
1433                    RegionEngineWithStatus::Registering(engine.clone()),
1434                );
1435            }
1436            RegionChange::Deregisters => {
1437                self.region_map.insert(
1438                    region_id,
1439                    RegionEngineWithStatus::Deregistering(engine.clone()),
1440                );
1441            }
1442            _ => {}
1443        }
1444    }
1445
1446    fn unset_region_status(
1447        &self,
1448        region_id: RegionId,
1449        engine: &RegionEngineRef,
1450        region_change: RegionChange,
1451    ) {
1452        match region_change {
1453            RegionChange::None | RegionChange::Ingest => {}
1454            RegionChange::Register(_) => {
1455                self.region_map.remove(&region_id);
1456            }
1457            RegionChange::Deregisters => {
1458                self.region_map
1459                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1460            }
1461            RegionChange::Catchup => {}
1462        }
1463    }
1464
1465    async fn set_region_status_ready(
1466        &self,
1467        region_id: RegionId,
1468        engine: RegionEngineRef,
1469        region_change: RegionChange,
1470    ) -> Result<()> {
1471        let engine_type = engine.name();
1472        match region_change {
1473            RegionChange::None | RegionChange::Ingest => {}
1474            RegionChange::Register(attribute) => {
1475                info!(
1476                    "Region {region_id} is registered to engine {}",
1477                    attribute.engine()
1478                );
1479                self.region_map
1480                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1481
1482                match attribute {
1483                    RegionAttribute::Metric { physical } => {
1484                        if physical {
1485                            // Registers the logical regions belong to the physical region (`region_id`).
1486                            self.register_logical_regions(&engine, region_id).await?;
1487                            // We only send the `on_region_registered` event of the physical region.
1488                            self.event_listener.on_region_registered(region_id);
1489                        }
1490                    }
1491                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1492                    RegionAttribute::File => {
1493                        // do nothing
1494                    }
1495                }
1496            }
1497            RegionChange::Deregisters => {
1498                info!("Region {region_id} is deregistered from engine {engine_type}");
1499                self.region_map
1500                    .remove(&region_id)
1501                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1502                self.event_listener.on_region_deregistered(region_id);
1503            }
1504            RegionChange::Catchup => {
1505                if is_metric_engine(engine.name()) {
1506                    // Registers the logical regions belong to the physical region (`region_id`).
1507                    self.register_logical_regions(&engine, region_id).await?;
1508                }
1509            }
1510        }
1511        Ok(())
1512    }
1513
1514    async fn register_logical_regions(
1515        &self,
1516        engine: &RegionEngineRef,
1517        physical_region_id: RegionId,
1518    ) -> Result<()> {
1519        let metric_engine =
1520            engine
1521                .as_any()
1522                .downcast_ref::<MetricEngine>()
1523                .context(UnexpectedSnafu {
1524                    violated: format!(
1525                        "expecting engine type '{}', actual '{}'",
1526                        METRIC_ENGINE_NAME,
1527                        engine.name(),
1528                    ),
1529                })?;
1530
1531        let logical_regions = metric_engine
1532            .logical_regions(physical_region_id)
1533            .await
1534            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1535
1536        for region in logical_regions {
1537            self.region_map
1538                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1539            info!("Logical region {} is registered!", region);
1540        }
1541        Ok(())
1542    }
1543
1544    pub async fn handle_read(
1545        &self,
1546        request: QueryRequest,
1547        query_ctx: QueryContextRef,
1548    ) -> Result<SendableRecordBatchStream> {
1549        // TODO(ruihang): add metrics and set trace id
1550
1551        let result = self
1552            .query_engine
1553            .execute(request.plan, query_ctx)
1554            .await
1555            .context(ExecuteLogicalPlanSnafu)?;
1556
1557        match result.data {
1558            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1559                UnsupportedOutputSnafu { expected: "stream" }.fail()
1560            }
1561            OutputData::Stream(stream) => Ok(stream),
1562        }
1563    }
1564
1565    async fn stop(&self) -> Result<()> {
1566        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1567        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1568        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1569        //
1570        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1571        // it here, collect the values first then use later separately.
1572
1573        let regions = self
1574            .region_map
1575            .iter()
1576            .map(|x| (*x.key(), x.value().clone()))
1577            .collect::<Vec<_>>();
1578        let num_regions = regions.len();
1579
1580        for (region_id, engine) in regions {
1581            let closed = engine
1582                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1583                .await;
1584            match closed {
1585                Ok(_) => debug!("Region {region_id} is closed"),
1586                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1587            }
1588        }
1589        self.region_map.clear();
1590        info!("closed {num_regions} regions");
1591
1592        drop(self.mito_engine.write().unwrap().take());
1593        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1594        for (engine_name, engine) in engines {
1595            engine
1596                .stop()
1597                .await
1598                .context(StopRegionEngineSnafu { name: &engine_name })?;
1599            info!("Region engine {engine_name} is stopped");
1600        }
1601
1602        Ok(())
1603    }
1604}
1605
1606#[derive(Debug, Clone, Copy)]
1607enum RegionChange {
1608    None,
1609    Register(RegionAttribute),
1610    Deregisters,
1611    Catchup,
1612    Ingest,
1613}
1614
1615fn is_metric_engine(engine: &str) -> bool {
1616    engine == METRIC_ENGINE_NAME
1617}
1618
1619fn parse_region_attribute(
1620    engine: &str,
1621    options: &HashMap<String, String>,
1622) -> Result<RegionAttribute> {
1623    match engine {
1624        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1625        METRIC_ENGINE_NAME => {
1626            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1627
1628            Ok(RegionAttribute::Metric { physical })
1629        }
1630        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1631        _ => error::UnexpectedSnafu {
1632            violated: format!("Unknown engine: {}", engine),
1633        }
1634        .fail(),
1635    }
1636}
1637
1638#[derive(Debug, Clone, Copy)]
1639enum RegionAttribute {
1640    Mito,
1641    Metric { physical: bool },
1642    File,
1643}
1644
1645impl RegionAttribute {
1646    fn engine(&self) -> &'static str {
1647        match self {
1648            RegionAttribute::Mito => MITO_ENGINE_NAME,
1649            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1650            RegionAttribute::File => FILE_ENGINE_NAME,
1651        }
1652    }
1653}
1654
1655#[cfg(test)]
1656mod tests {
1657
1658    use std::assert_matches::assert_matches;
1659
1660    use api::v1::SemanticType;
1661    use common_error::ext::ErrorExt;
1662    use datatypes::prelude::ConcreteDataType;
1663    use mito2::test_util::CreateRequestBuilder;
1664    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1665    use store_api::region_engine::RegionEngine;
1666    use store_api::region_request::{
1667        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1668    };
1669    use store_api::storage::RegionId;
1670
1671    use super::*;
1672    use crate::error::Result;
1673    use crate::tests::{MockRegionEngine, mock_region_server};
1674
1675    #[tokio::test]
1676    async fn test_region_registering() {
1677        common_telemetry::init_default_ut_logging();
1678
1679        let mut mock_region_server = mock_region_server();
1680        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1681        let engine_name = engine.name();
1682        mock_region_server.register_engine(engine.clone());
1683        let region_id = RegionId::new(1, 1);
1684        let builder = CreateRequestBuilder::new();
1685        let create_req = builder.build();
1686        // Tries to create/open a registering region.
1687        mock_region_server.inner.region_map.insert(
1688            region_id,
1689            RegionEngineWithStatus::Registering(engine.clone()),
1690        );
1691        let response = mock_region_server
1692            .handle_request(region_id, RegionRequest::Create(create_req))
1693            .await
1694            .unwrap();
1695        assert_eq!(response.affected_rows, 0);
1696        let status = mock_region_server
1697            .inner
1698            .region_map
1699            .get(&region_id)
1700            .unwrap()
1701            .clone();
1702        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1703
1704        mock_region_server.inner.region_map.insert(
1705            region_id,
1706            RegionEngineWithStatus::Registering(engine.clone()),
1707        );
1708        let response = mock_region_server
1709            .handle_request(
1710                region_id,
1711                RegionRequest::Open(RegionOpenRequest {
1712                    engine: engine_name.to_string(),
1713                    table_dir: String::new(),
1714                    path_type: PathType::Bare,
1715                    options: Default::default(),
1716                    skip_wal_replay: false,
1717                    checkpoint: None,
1718                }),
1719            )
1720            .await
1721            .unwrap();
1722        assert_eq!(response.affected_rows, 0);
1723        let status = mock_region_server
1724            .inner
1725            .region_map
1726            .get(&region_id)
1727            .unwrap()
1728            .clone();
1729        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1730    }
1731
1732    #[tokio::test]
1733    async fn test_region_deregistering() {
1734        common_telemetry::init_default_ut_logging();
1735
1736        let mut mock_region_server = mock_region_server();
1737        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1738
1739        mock_region_server.register_engine(engine.clone());
1740
1741        let region_id = RegionId::new(1, 1);
1742
1743        // Tries to drop/close a registering region.
1744        mock_region_server.inner.region_map.insert(
1745            region_id,
1746            RegionEngineWithStatus::Deregistering(engine.clone()),
1747        );
1748
1749        let response = mock_region_server
1750            .handle_request(
1751                region_id,
1752                RegionRequest::Drop(RegionDropRequest {
1753                    fast_path: false,
1754                    force: false,
1755                }),
1756            )
1757            .await
1758            .unwrap();
1759        assert_eq!(response.affected_rows, 0);
1760
1761        let status = mock_region_server
1762            .inner
1763            .region_map
1764            .get(&region_id)
1765            .unwrap()
1766            .clone();
1767        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1768
1769        mock_region_server.inner.region_map.insert(
1770            region_id,
1771            RegionEngineWithStatus::Deregistering(engine.clone()),
1772        );
1773
1774        let response = mock_region_server
1775            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1776            .await
1777            .unwrap();
1778        assert_eq!(response.affected_rows, 0);
1779
1780        let status = mock_region_server
1781            .inner
1782            .region_map
1783            .get(&region_id)
1784            .unwrap()
1785            .clone();
1786        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1787    }
1788
1789    #[tokio::test]
1790    async fn test_region_not_ready() {
1791        common_telemetry::init_default_ut_logging();
1792
1793        let mut mock_region_server = mock_region_server();
1794        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1795
1796        mock_region_server.register_engine(engine.clone());
1797
1798        let region_id = RegionId::new(1, 1);
1799
1800        // Tries to drop/close a registering region.
1801        mock_region_server.inner.region_map.insert(
1802            region_id,
1803            RegionEngineWithStatus::Registering(engine.clone()),
1804        );
1805
1806        let err = mock_region_server
1807            .handle_request(
1808                region_id,
1809                RegionRequest::Truncate(RegionTruncateRequest::All),
1810            )
1811            .await
1812            .unwrap_err();
1813
1814        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1815    }
1816
1817    #[tokio::test]
1818    async fn test_region_request_failed() {
1819        common_telemetry::init_default_ut_logging();
1820
1821        let mut mock_region_server = mock_region_server();
1822        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1823            MITO_ENGINE_NAME,
1824            Box::new(|_region_id, _request| {
1825                error::UnexpectedSnafu {
1826                    violated: "test".to_string(),
1827                }
1828                .fail()
1829            }),
1830        );
1831
1832        mock_region_server.register_engine(engine.clone());
1833
1834        let region_id = RegionId::new(1, 1);
1835        let builder = CreateRequestBuilder::new();
1836        let create_req = builder.build();
1837        mock_region_server
1838            .handle_request(region_id, RegionRequest::Create(create_req))
1839            .await
1840            .unwrap_err();
1841
1842        let status = mock_region_server.inner.region_map.get(&region_id);
1843        assert!(status.is_none());
1844
1845        mock_region_server
1846            .inner
1847            .region_map
1848            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1849
1850        mock_region_server
1851            .handle_request(
1852                region_id,
1853                RegionRequest::Drop(RegionDropRequest {
1854                    fast_path: false,
1855                    force: false,
1856                }),
1857            )
1858            .await
1859            .unwrap_err();
1860
1861        let status = mock_region_server.inner.region_map.get(&region_id);
1862        assert!(status.is_some());
1863    }
1864
1865    #[tokio::test]
1866    async fn test_batch_open_region_ignore_nonexistent_regions() {
1867        common_telemetry::init_default_ut_logging();
1868        let mut mock_region_server = mock_region_server();
1869        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1870            MITO_ENGINE_NAME,
1871            Box::new(|region_id, _request| {
1872                if region_id == RegionId::new(1, 1) {
1873                    error::RegionNotFoundSnafu { region_id }.fail()
1874                } else {
1875                    Ok(0)
1876                }
1877            }),
1878        );
1879        mock_region_server.register_engine(engine.clone());
1880
1881        let region_ids = mock_region_server
1882            .handle_batch_open_requests(
1883                8,
1884                vec![
1885                    (
1886                        RegionId::new(1, 1),
1887                        RegionOpenRequest {
1888                            engine: MITO_ENGINE_NAME.to_string(),
1889                            table_dir: String::new(),
1890                            path_type: PathType::Bare,
1891                            options: Default::default(),
1892                            skip_wal_replay: false,
1893                            checkpoint: None,
1894                        },
1895                    ),
1896                    (
1897                        RegionId::new(1, 2),
1898                        RegionOpenRequest {
1899                            engine: MITO_ENGINE_NAME.to_string(),
1900                            table_dir: String::new(),
1901                            path_type: PathType::Bare,
1902                            options: Default::default(),
1903                            skip_wal_replay: false,
1904                            checkpoint: None,
1905                        },
1906                    ),
1907                ],
1908                true,
1909            )
1910            .await
1911            .unwrap();
1912        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1913
1914        let err = mock_region_server
1915            .handle_batch_open_requests(
1916                8,
1917                vec![
1918                    (
1919                        RegionId::new(1, 1),
1920                        RegionOpenRequest {
1921                            engine: MITO_ENGINE_NAME.to_string(),
1922                            table_dir: String::new(),
1923                            path_type: PathType::Bare,
1924                            options: Default::default(),
1925                            skip_wal_replay: false,
1926                            checkpoint: None,
1927                        },
1928                    ),
1929                    (
1930                        RegionId::new(1, 2),
1931                        RegionOpenRequest {
1932                            engine: MITO_ENGINE_NAME.to_string(),
1933                            table_dir: String::new(),
1934                            path_type: PathType::Bare,
1935                            options: Default::default(),
1936                            skip_wal_replay: false,
1937                            checkpoint: None,
1938                        },
1939                    ),
1940                ],
1941                false,
1942            )
1943            .await
1944            .unwrap_err();
1945        assert_eq!(err.status_code(), StatusCode::Unexpected);
1946    }
1947
1948    struct CurrentEngineTest {
1949        region_id: RegionId,
1950        current_region_status: Option<RegionEngineWithStatus>,
1951        region_change: RegionChange,
1952        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1953    }
1954
1955    #[tokio::test]
1956    async fn test_current_engine() {
1957        common_telemetry::init_default_ut_logging();
1958
1959        let mut mock_region_server = mock_region_server();
1960        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1961        mock_region_server.register_engine(engine.clone());
1962
1963        let region_id = RegionId::new(1024, 1);
1964        let tests = vec![
1965            // RegionChange::None
1966            CurrentEngineTest {
1967                region_id,
1968                current_region_status: None,
1969                region_change: RegionChange::None,
1970                assert: Box::new(|result| {
1971                    let err = result.unwrap_err();
1972                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1973                }),
1974            },
1975            CurrentEngineTest {
1976                region_id,
1977                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1978                region_change: RegionChange::None,
1979                assert: Box::new(|result| {
1980                    let current_engine = result.unwrap();
1981                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1982                }),
1983            },
1984            CurrentEngineTest {
1985                region_id,
1986                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1987                region_change: RegionChange::None,
1988                assert: Box::new(|result| {
1989                    let err = result.unwrap_err();
1990                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1991                }),
1992            },
1993            CurrentEngineTest {
1994                region_id,
1995                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1996                region_change: RegionChange::None,
1997                assert: Box::new(|result| {
1998                    let err = result.unwrap_err();
1999                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2000                }),
2001            },
2002            // RegionChange::Register
2003            CurrentEngineTest {
2004                region_id,
2005                current_region_status: None,
2006                region_change: RegionChange::Register(RegionAttribute::Mito),
2007                assert: Box::new(|result| {
2008                    let current_engine = result.unwrap();
2009                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2010                }),
2011            },
2012            CurrentEngineTest {
2013                region_id,
2014                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2015                region_change: RegionChange::Register(RegionAttribute::Mito),
2016                assert: Box::new(|result| {
2017                    let current_engine = result.unwrap();
2018                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2019                }),
2020            },
2021            CurrentEngineTest {
2022                region_id,
2023                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2024                region_change: RegionChange::Register(RegionAttribute::Mito),
2025                assert: Box::new(|result| {
2026                    let err = result.unwrap_err();
2027                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2028                }),
2029            },
2030            CurrentEngineTest {
2031                region_id,
2032                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2033                region_change: RegionChange::Register(RegionAttribute::Mito),
2034                assert: Box::new(|result| {
2035                    let current_engine = result.unwrap();
2036                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2037                }),
2038            },
2039            // RegionChange::Deregister
2040            CurrentEngineTest {
2041                region_id,
2042                current_region_status: None,
2043                region_change: RegionChange::Deregisters,
2044                assert: Box::new(|result| {
2045                    let current_engine = result.unwrap();
2046                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2047                }),
2048            },
2049            CurrentEngineTest {
2050                region_id,
2051                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2052                region_change: RegionChange::Deregisters,
2053                assert: Box::new(|result| {
2054                    let err = result.unwrap_err();
2055                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2056                }),
2057            },
2058            CurrentEngineTest {
2059                region_id,
2060                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2061                region_change: RegionChange::Deregisters,
2062                assert: Box::new(|result| {
2063                    let current_engine = result.unwrap();
2064                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2065                }),
2066            },
2067            CurrentEngineTest {
2068                region_id,
2069                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2070                region_change: RegionChange::Deregisters,
2071                assert: Box::new(|result| {
2072                    let current_engine = result.unwrap();
2073                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2074                }),
2075            },
2076        ];
2077
2078        for test in tests {
2079            let CurrentEngineTest {
2080                region_id,
2081                current_region_status,
2082                region_change,
2083                assert,
2084            } = test;
2085
2086            // Sets up
2087            if let Some(status) = current_region_status {
2088                mock_region_server
2089                    .inner
2090                    .region_map
2091                    .insert(region_id, status);
2092            } else {
2093                mock_region_server.inner.region_map.remove(&region_id);
2094            }
2095
2096            let result = mock_region_server
2097                .inner
2098                .get_engine(region_id, &region_change);
2099
2100            assert(result);
2101        }
2102    }
2103
2104    #[tokio::test]
2105    async fn test_region_server_parallelism() {
2106        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2107        let first_query = p.acquire().await;
2108        assert!(first_query.is_ok());
2109        let second_query = p.acquire().await;
2110        assert!(second_query.is_ok());
2111        let third_query = p.acquire().await;
2112        assert!(third_query.is_err());
2113        let err = third_query.unwrap_err();
2114        assert_eq!(
2115            err.output_msg(),
2116            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2117        );
2118        drop(first_query);
2119        let forth_query = p.acquire().await;
2120        assert!(forth_query.is_ok());
2121    }
2122
2123    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2124        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2125        metadata_builder.push_column_metadata(ColumnMetadata {
2126            column_schema: datatypes::schema::ColumnSchema::new(
2127                "timestamp",
2128                ConcreteDataType::timestamp_nanosecond_datatype(),
2129                false,
2130            ),
2131            semantic_type: SemanticType::Timestamp,
2132            column_id: 0,
2133        });
2134        metadata_builder.push_column_metadata(ColumnMetadata {
2135            column_schema: datatypes::schema::ColumnSchema::new(
2136                "file",
2137                ConcreteDataType::string_datatype(),
2138                true,
2139            ),
2140            semantic_type: SemanticType::Tag,
2141            column_id: 1,
2142        });
2143        metadata_builder.push_column_metadata(ColumnMetadata {
2144            column_schema: datatypes::schema::ColumnSchema::new(
2145                "message",
2146                ConcreteDataType::string_datatype(),
2147                true,
2148            ),
2149            semantic_type: SemanticType::Field,
2150            column_id: 2,
2151        });
2152        metadata_builder.primary_key(vec![1]);
2153        metadata_builder.build().unwrap()
2154    }
2155
2156    #[tokio::test]
2157    async fn test_handle_list_metadata_request() {
2158        common_telemetry::init_default_ut_logging();
2159
2160        let mut mock_region_server = mock_region_server();
2161        let region_id_1 = RegionId::new(1, 0);
2162        let region_id_2 = RegionId::new(2, 0);
2163
2164        let metadata_1 = mock_region_metadata(region_id_1);
2165        let metadata_2 = mock_region_metadata(region_id_2);
2166        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2167
2168        let metadata_1 = Arc::new(metadata_1);
2169        let metadata_2 = Arc::new(metadata_2);
2170        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2171            MITO_ENGINE_NAME,
2172            Box::new(move |region_id| {
2173                if region_id == region_id_1 {
2174                    Ok(metadata_1.clone())
2175                } else if region_id == region_id_2 {
2176                    Ok(metadata_2.clone())
2177                } else {
2178                    error::RegionNotFoundSnafu { region_id }.fail()
2179                }
2180            }),
2181        );
2182
2183        mock_region_server.register_engine(engine.clone());
2184        mock_region_server
2185            .inner
2186            .region_map
2187            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2188        mock_region_server
2189            .inner
2190            .region_map
2191            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2192
2193        // All regions exist.
2194        let list_metadata_request = ListMetadataRequest {
2195            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2196        };
2197        let response = mock_region_server
2198            .handle_list_metadata_request(&list_metadata_request)
2199            .await
2200            .unwrap();
2201        let decoded_metadata: Vec<Option<RegionMetadata>> =
2202            serde_json::from_slice(&response.metadata).unwrap();
2203        assert_eq!(metadatas, decoded_metadata);
2204    }
2205
2206    #[tokio::test]
2207    async fn test_handle_list_metadata_not_found() {
2208        common_telemetry::init_default_ut_logging();
2209
2210        let mut mock_region_server = mock_region_server();
2211        let region_id_1 = RegionId::new(1, 0);
2212        let region_id_2 = RegionId::new(2, 0);
2213
2214        let metadata_1 = mock_region_metadata(region_id_1);
2215        let metadatas = vec![Some(metadata_1.clone()), None];
2216
2217        let metadata_1 = Arc::new(metadata_1);
2218        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2219            MITO_ENGINE_NAME,
2220            Box::new(move |region_id| {
2221                if region_id == region_id_1 {
2222                    Ok(metadata_1.clone())
2223                } else {
2224                    error::RegionNotFoundSnafu { region_id }.fail()
2225                }
2226            }),
2227        );
2228
2229        mock_region_server.register_engine(engine.clone());
2230        mock_region_server
2231            .inner
2232            .region_map
2233            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2234
2235        // Not in region map.
2236        let list_metadata_request = ListMetadataRequest {
2237            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2238        };
2239        let response = mock_region_server
2240            .handle_list_metadata_request(&list_metadata_request)
2241            .await
2242            .unwrap();
2243        let decoded_metadata: Vec<Option<RegionMetadata>> =
2244            serde_json::from_slice(&response.metadata).unwrap();
2245        assert_eq!(metadatas, decoded_metadata);
2246
2247        // Not in region engine.
2248        mock_region_server
2249            .inner
2250            .region_map
2251            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2252        let response = mock_region_server
2253            .handle_list_metadata_request(&list_metadata_request)
2254            .await
2255            .unwrap();
2256        let decoded_metadata: Vec<Option<RegionMetadata>> =
2257            serde_json::from_slice(&response.metadata).unwrap();
2258        assert_eq!(metadatas, decoded_metadata);
2259    }
2260
2261    #[tokio::test]
2262    async fn test_handle_list_metadata_failed() {
2263        common_telemetry::init_default_ut_logging();
2264
2265        let mut mock_region_server = mock_region_server();
2266        let region_id_1 = RegionId::new(1, 0);
2267
2268        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2269            MITO_ENGINE_NAME,
2270            Box::new(move |region_id| {
2271                error::UnexpectedSnafu {
2272                    violated: format!("Failed to get region {region_id}"),
2273                }
2274                .fail()
2275            }),
2276        );
2277
2278        mock_region_server.register_engine(engine.clone());
2279        mock_region_server
2280            .inner
2281            .region_map
2282            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2283
2284        // Failed to get.
2285        let list_metadata_request = ListMetadataRequest {
2286            region_ids: vec![region_id_1.as_u64()],
2287        };
2288        mock_region_server
2289            .handle_list_metadata_request(&list_metadata_request)
2290            .await
2291            .unwrap_err();
2292    }
2293}