datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use api::region::RegionResponse;
22use api::v1::region::sync_request::ManifestInfo;
23use api::v1::region::{
24    region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
25};
26use api::v1::{ResponseHeader, Status};
27use arrow_flight::{FlightData, Ticket};
28use async_trait::async_trait;
29use bytes::Bytes;
30use common_error::ext::{BoxedError, ErrorExt};
31use common_error::status_code::StatusCode;
32use common_query::request::QueryRequest;
33use common_query::OutputData;
34use common_recordbatch::SendableRecordBatchStream;
35use common_runtime::Runtime;
36use common_telemetry::tracing::{self, info_span};
37use common_telemetry::tracing_context::{FutureExt, TracingContext};
38use common_telemetry::{debug, error, info, warn};
39use dashmap::DashMap;
40use datafusion::datasource::{provider_as_source, TableProvider};
41use datafusion::error::Result as DfResult;
42use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
43use datafusion_expr::{LogicalPlan, TableSource};
44use futures_util::future::try_join_all;
45use metric_engine::engine::MetricEngine;
46use mito2::engine::MITO_ENGINE_NAME;
47use prost::Message;
48pub use query::dummy_catalog::{
49    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
50};
51use query::QueryEngineRef;
52use serde_json;
53use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
54use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
55use servers::grpc::region_server::RegionServerHandler;
56use servers::grpc::FlightCompression;
57use session::context::{QueryContextBuilder, QueryContextRef};
58use snafu::{ensure, OptionExt, ResultExt};
59use store_api::metric_engine_consts::{
60    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
61};
62use store_api::region_engine::{
63    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
64    SettableRegionRoleState,
65};
66use store_api::region_request::{
67    AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
68};
69use store_api::storage::RegionId;
70use tokio::sync::{Semaphore, SemaphorePermit};
71use tokio::time::timeout;
72use tonic::{Request, Response, Result as TonicResult};
73
74use crate::error::{
75    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
76    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
77    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
78    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
79    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
80    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
81};
82use crate::event_listener::RegionServerEventListenerRef;
83
84#[derive(Clone)]
85pub struct RegionServer {
86    inner: Arc<RegionServerInner>,
87    flight_compression: FlightCompression,
88}
89
90pub struct RegionStat {
91    pub region_id: RegionId,
92    pub engine: String,
93    pub role: RegionRole,
94}
95
96impl RegionServer {
97    pub fn new(
98        query_engine: QueryEngineRef,
99        runtime: Runtime,
100        event_listener: RegionServerEventListenerRef,
101        flight_compression: FlightCompression,
102    ) -> Self {
103        Self::with_table_provider(
104            query_engine,
105            runtime,
106            event_listener,
107            Arc::new(DummyTableProviderFactory),
108            0,
109            Duration::from_millis(0),
110            flight_compression,
111        )
112    }
113
114    pub fn with_table_provider(
115        query_engine: QueryEngineRef,
116        runtime: Runtime,
117        event_listener: RegionServerEventListenerRef,
118        table_provider_factory: TableProviderFactoryRef,
119        max_concurrent_queries: usize,
120        concurrent_query_limiter_timeout: Duration,
121        flight_compression: FlightCompression,
122    ) -> Self {
123        Self {
124            inner: Arc::new(RegionServerInner::new(
125                query_engine,
126                runtime,
127                event_listener,
128                table_provider_factory,
129                RegionServerParallelism::from_opts(
130                    max_concurrent_queries,
131                    concurrent_query_limiter_timeout,
132                ),
133            )),
134            flight_compression,
135        }
136    }
137
138    pub fn register_engine(&mut self, engine: RegionEngineRef) {
139        self.inner.register_engine(engine);
140    }
141
142    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
143    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
144        match self.inner.get_engine(region_id, &RegionChange::None) {
145            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
146            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
147            Err(error::Error::RegionNotFound { .. }) => Ok(None),
148            Err(err) => Err(err),
149        }
150    }
151
152    #[tracing::instrument(skip_all)]
153    pub async fn handle_batch_open_requests(
154        &self,
155        parallelism: usize,
156        requests: Vec<(RegionId, RegionOpenRequest)>,
157    ) -> Result<Vec<RegionId>> {
158        self.inner
159            .handle_batch_open_requests(parallelism, requests)
160            .await
161    }
162
163    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
164    pub async fn handle_request(
165        &self,
166        region_id: RegionId,
167        request: RegionRequest,
168    ) -> Result<RegionResponse> {
169        self.inner.handle_request(region_id, request).await
170    }
171
172    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
173    async fn table_provider(
174        &self,
175        region_id: RegionId,
176        ctx: Option<&session::context::QueryContext>,
177    ) -> Result<Arc<dyn TableProvider>> {
178        let status = self
179            .inner
180            .region_map
181            .get(&region_id)
182            .context(RegionNotFoundSnafu { region_id })?
183            .clone();
184        ensure!(
185            matches!(status, RegionEngineWithStatus::Ready(_)),
186            RegionNotReadySnafu { region_id }
187        );
188
189        self.inner
190            .table_provider_factory
191            .create(region_id, status.into_engine(), ctx)
192            .await
193            .context(ExecuteLogicalPlanSnafu)
194    }
195
196    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
197    pub async fn handle_remote_read(
198        &self,
199        request: api::v1::region::QueryRequest,
200    ) -> Result<SendableRecordBatchStream> {
201        let _permit = if let Some(p) = &self.inner.parallelism {
202            Some(p.acquire().await?)
203        } else {
204            None
205        };
206
207        let query_ctx: QueryContextRef = request
208            .header
209            .as_ref()
210            .map(|h| Arc::new(h.into()))
211            .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
212
213        let region_id = RegionId::from_u64(request.region_id);
214        let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
215        let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
216
217        let decoder = self
218            .inner
219            .query_engine
220            .engine_context(query_ctx)
221            .new_plan_decoder()
222            .context(NewPlanDecoderSnafu)?;
223
224        let plan = decoder
225            .decode(Bytes::from(request.plan), catalog_list, false)
226            .await
227            .context(DecodeLogicalPlanSnafu)?;
228
229        self.inner
230            .handle_read(QueryRequest {
231                header: request.header,
232                region_id,
233                plan,
234            })
235            .await
236    }
237
238    #[tracing::instrument(skip_all)]
239    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
240        let _permit = if let Some(p) = &self.inner.parallelism {
241            Some(p.acquire().await?)
242        } else {
243            None
244        };
245
246        let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
247
248        let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
249
250        struct RegionDataSourceInjector {
251            source: Arc<dyn TableSource>,
252        }
253
254        impl TreeNodeRewriter for RegionDataSourceInjector {
255            type Node = LogicalPlan;
256
257            fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
258                Ok(match node {
259                    LogicalPlan::TableScan(mut scan) => {
260                        scan.source = self.source.clone();
261                        Transformed::yes(LogicalPlan::TableScan(scan))
262                    }
263                    _ => Transformed::no(node),
264                })
265            }
266        }
267
268        let plan = request
269            .plan
270            .rewrite(&mut RegionDataSourceInjector {
271                source: provider_as_source(provider),
272            })
273            .context(DataFusionSnafu)?
274            .data;
275
276        self.inner
277            .handle_read(QueryRequest { plan, ..request })
278            .await
279    }
280
281    /// Returns all opened and reportable regions.
282    ///
283    /// Notes: except all metrics regions.
284    pub fn reportable_regions(&self) -> Vec<RegionStat> {
285        self.inner
286            .region_map
287            .iter()
288            .filter_map(|e| {
289                let region_id = *e.key();
290                // Filters out any regions whose role equals None.
291                e.role(region_id).map(|role| RegionStat {
292                    region_id,
293                    engine: e.value().name().to_string(),
294                    role,
295                })
296            })
297            .collect()
298    }
299
300    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
301        self.inner.region_map.get(&region_id).and_then(|engine| {
302            engine.role(region_id).map(|role| match role {
303                RegionRole::Follower => false,
304                RegionRole::Leader => true,
305                RegionRole::DowngradingLeader => true,
306            })
307        })
308    }
309
310    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
311        let engine = self
312            .inner
313            .region_map
314            .get(&region_id)
315            .with_context(|| RegionNotFoundSnafu { region_id })?;
316        engine
317            .set_region_role(region_id, role)
318            .with_context(|_| HandleRegionRequestSnafu { region_id })
319    }
320
321    /// Set region role state gracefully.
322    ///
323    /// For [SettableRegionRoleState::Follower]:
324    /// After the call returns, the engine ensures that
325    /// no **further** write or flush operations will succeed in this region.
326    ///
327    /// For [SettableRegionRoleState::DowngradingLeader]:
328    /// After the call returns, the engine ensures that
329    /// no **further** write operations will succeed in this region.
330    pub async fn set_region_role_state_gracefully(
331        &self,
332        region_id: RegionId,
333        state: SettableRegionRoleState,
334    ) -> Result<SetRegionRoleStateResponse> {
335        match self.inner.region_map.get(&region_id) {
336            Some(engine) => Ok(engine
337                .set_region_role_state_gracefully(region_id, state)
338                .await
339                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
340            None => Ok(SetRegionRoleStateResponse::NotFound),
341        }
342    }
343
344    pub fn runtime(&self) -> Runtime {
345        self.inner.runtime.clone()
346    }
347
348    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
349        match self.inner.region_map.get(&region_id) {
350            Some(e) => e.region_statistic(region_id),
351            None => None,
352        }
353    }
354
355    /// Stop the region server.
356    pub async fn stop(&self) -> Result<()> {
357        self.inner.stop().await
358    }
359
360    #[cfg(test)]
361    /// Registers a region for test purpose.
362    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
363        self.inner
364            .region_map
365            .insert(region_id, RegionEngineWithStatus::Ready(engine));
366    }
367
368    async fn handle_batch_ddl_requests(
369        &self,
370        request: region_request::Body,
371    ) -> Result<RegionResponse> {
372        // Safety: we have already checked the request type in `RegionServer::handle()`.
373        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
374            .context(BuildRegionRequestsSnafu)?
375            .unwrap();
376        let tracing_context = TracingContext::from_current_span();
377
378        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
379        self.inner
380            .handle_batch_request(batch_request)
381            .trace(span)
382            .await
383    }
384
385    async fn handle_requests_in_parallel(
386        &self,
387        request: region_request::Body,
388    ) -> Result<RegionResponse> {
389        let requests =
390            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
391        let tracing_context = TracingContext::from_current_span();
392
393        let join_tasks = requests.into_iter().map(|(region_id, req)| {
394            let self_to_move = self;
395            let span = tracing_context.attach(info_span!(
396                "RegionServer::handle_region_request",
397                region_id = region_id.to_string()
398            ));
399            async move {
400                self_to_move
401                    .handle_request(region_id, req)
402                    .trace(span)
403                    .await
404            }
405        });
406
407        let results = try_join_all(join_tasks).await?;
408        let mut affected_rows = 0;
409        let mut extensions = HashMap::new();
410        for result in results {
411            affected_rows += result.affected_rows;
412            extensions.extend(result.extensions);
413        }
414
415        Ok(RegionResponse {
416            affected_rows,
417            extensions,
418            metadata: Vec::new(),
419        })
420    }
421
422    async fn handle_requests_in_serial(
423        &self,
424        request: region_request::Body,
425    ) -> Result<RegionResponse> {
426        let requests =
427            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
428        let tracing_context = TracingContext::from_current_span();
429
430        let mut affected_rows = 0;
431        let mut extensions = HashMap::new();
432        // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
433        // modify this part to avoid inefficient serial loop calls.
434        for (region_id, req) in requests {
435            let span = tracing_context.attach(info_span!(
436                "RegionServer::handle_region_request",
437                region_id = region_id.to_string()
438            ));
439            let result = self.handle_request(region_id, req).trace(span).await?;
440
441            affected_rows += result.affected_rows;
442            extensions.extend(result.extensions);
443        }
444
445        Ok(RegionResponse {
446            affected_rows,
447            extensions,
448            metadata: Vec::new(),
449        })
450    }
451
452    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
453        let region_id = RegionId::from_u64(request.region_id);
454        let manifest_info = request
455            .manifest_info
456            .context(error::MissingRequiredFieldSnafu {
457                name: "manifest_info",
458            })?;
459
460        let manifest_info = match manifest_info {
461            ManifestInfo::MitoManifestInfo(info) => {
462                RegionManifestInfo::mito(info.data_manifest_version, 0)
463            }
464            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
465                info.data_manifest_version,
466                0,
467                info.metadata_manifest_version,
468                0,
469            ),
470        };
471
472        let tracing_context = TracingContext::from_current_span();
473        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
474
475        self.sync_region(region_id, manifest_info)
476            .trace(span)
477            .await
478            .map(|_| RegionResponse::new(AffectedRows::default()))
479    }
480
481    /// Handles the ListMetadata request and retrieves metadata for specified regions.
482    ///
483    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
484    /// non-existing regions as `null`.
485    #[tracing::instrument(skip_all)]
486    async fn handle_list_metadata_request(
487        &self,
488        request: &ListMetadataRequest,
489    ) -> Result<RegionResponse> {
490        let mut region_metadatas = Vec::new();
491        // Collect metadata for each region
492        for region_id in &request.region_ids {
493            let region_id = RegionId::from_u64(*region_id);
494            // Get the engine.
495            let Some(engine) = self.find_engine(region_id)? else {
496                region_metadatas.push(None);
497                continue;
498            };
499
500            match engine.get_metadata(region_id).await {
501                Ok(metadata) => region_metadatas.push(Some(metadata)),
502                Err(err) => {
503                    if err.status_code() == StatusCode::RegionNotFound {
504                        region_metadatas.push(None);
505                    } else {
506                        Err(err).with_context(|_| GetRegionMetadataSnafu {
507                            engine: engine.name(),
508                            region_id,
509                        })?;
510                    }
511                }
512            }
513        }
514
515        // Serialize metadata to JSON
516        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
517
518        let response = RegionResponse::from_metadata(json_result);
519
520        Ok(response)
521    }
522
523    /// Sync region manifest and registers new opened logical regions.
524    pub async fn sync_region(
525        &self,
526        region_id: RegionId,
527        manifest_info: RegionManifestInfo,
528    ) -> Result<()> {
529        let engine_with_status = self
530            .inner
531            .region_map
532            .get(&region_id)
533            .with_context(|| RegionNotFoundSnafu { region_id })?;
534
535        self.inner
536            .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
537            .await
538    }
539}
540
541#[async_trait]
542impl RegionServerHandler for RegionServer {
543    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
544        let response = match &request {
545            region_request::Body::Creates(_)
546            | region_request::Body::Drops(_)
547            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
548            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
549                self.handle_requests_in_parallel(request).await
550            }
551            region_request::Body::Sync(sync_request) => {
552                self.handle_sync_region_request(sync_request).await
553            }
554            region_request::Body::ListMetadata(list_metadata_request) => {
555                self.handle_list_metadata_request(list_metadata_request)
556                    .await
557            }
558            _ => self.handle_requests_in_serial(request).await,
559        }
560        .map_err(BoxedError::new)
561        .context(ExecuteGrpcRequestSnafu)?;
562
563        Ok(RegionResponseV1 {
564            header: Some(ResponseHeader {
565                status: Some(Status {
566                    status_code: StatusCode::Success as _,
567                    ..Default::default()
568                }),
569            }),
570            affected_rows: response.affected_rows as _,
571            extensions: response.extensions,
572            metadata: response.metadata,
573        })
574    }
575}
576
577#[async_trait]
578impl FlightCraft for RegionServer {
579    async fn do_get(
580        &self,
581        request: Request<Ticket>,
582    ) -> TonicResult<Response<TonicStream<FlightData>>> {
583        let ticket = request.into_inner().ticket;
584        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
585            .context(servers_error::InvalidFlightTicketSnafu)?;
586        let tracing_context = request
587            .header
588            .as_ref()
589            .map(|h| TracingContext::from_w3c(&h.tracing_context))
590            .unwrap_or_default();
591
592        let result = self
593            .handle_remote_read(request)
594            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
595            .await?;
596
597        let stream = Box::pin(FlightRecordBatchStream::new(
598            result,
599            tracing_context,
600            self.flight_compression,
601        ));
602        Ok(Response::new(stream))
603    }
604}
605
606#[derive(Clone)]
607enum RegionEngineWithStatus {
608    // An opening, or creating region.
609    Registering(RegionEngineRef),
610    // A closing, or dropping region.
611    Deregistering(RegionEngineRef),
612    // A ready region.
613    Ready(RegionEngineRef),
614}
615
616impl RegionEngineWithStatus {
617    /// Returns [RegionEngineRef].
618    pub fn into_engine(self) -> RegionEngineRef {
619        match self {
620            RegionEngineWithStatus::Registering(engine) => engine,
621            RegionEngineWithStatus::Deregistering(engine) => engine,
622            RegionEngineWithStatus::Ready(engine) => engine,
623        }
624    }
625
626    /// Returns [RegionEngineRef] reference.
627    pub fn engine(&self) -> &RegionEngineRef {
628        match self {
629            RegionEngineWithStatus::Registering(engine) => engine,
630            RegionEngineWithStatus::Deregistering(engine) => engine,
631            RegionEngineWithStatus::Ready(engine) => engine,
632        }
633    }
634}
635
636impl Deref for RegionEngineWithStatus {
637    type Target = RegionEngineRef;
638
639    fn deref(&self) -> &Self::Target {
640        match self {
641            RegionEngineWithStatus::Registering(engine) => engine,
642            RegionEngineWithStatus::Deregistering(engine) => engine,
643            RegionEngineWithStatus::Ready(engine) => engine,
644        }
645    }
646}
647
648struct RegionServerInner {
649    engines: RwLock<HashMap<String, RegionEngineRef>>,
650    region_map: DashMap<RegionId, RegionEngineWithStatus>,
651    query_engine: QueryEngineRef,
652    runtime: Runtime,
653    event_listener: RegionServerEventListenerRef,
654    table_provider_factory: TableProviderFactoryRef,
655    // The number of queries allowed to be executed at the same time.
656    // Act as last line of defense on datanode to prevent query overloading.
657    parallelism: Option<RegionServerParallelism>,
658}
659
660struct RegionServerParallelism {
661    semaphore: Semaphore,
662    timeout: Duration,
663}
664
665impl RegionServerParallelism {
666    pub fn from_opts(
667        max_concurrent_queries: usize,
668        concurrent_query_limiter_timeout: Duration,
669    ) -> Option<Self> {
670        if max_concurrent_queries == 0 {
671            return None;
672        }
673        Some(RegionServerParallelism {
674            semaphore: Semaphore::new(max_concurrent_queries),
675            timeout: concurrent_query_limiter_timeout,
676        })
677    }
678
679    pub async fn acquire(&self) -> Result<SemaphorePermit> {
680        timeout(self.timeout, self.semaphore.acquire())
681            .await
682            .context(ConcurrentQueryLimiterTimeoutSnafu)?
683            .context(ConcurrentQueryLimiterClosedSnafu)
684    }
685}
686
687enum CurrentEngine {
688    Engine(RegionEngineRef),
689    EarlyReturn(AffectedRows),
690}
691
692impl Debug for CurrentEngine {
693    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
694        match self {
695            CurrentEngine::Engine(engine) => f
696                .debug_struct("CurrentEngine")
697                .field("engine", &engine.name())
698                .finish(),
699            CurrentEngine::EarlyReturn(rows) => f
700                .debug_struct("CurrentEngine")
701                .field("return", rows)
702                .finish(),
703        }
704    }
705}
706
707impl RegionServerInner {
708    pub fn new(
709        query_engine: QueryEngineRef,
710        runtime: Runtime,
711        event_listener: RegionServerEventListenerRef,
712        table_provider_factory: TableProviderFactoryRef,
713        parallelism: Option<RegionServerParallelism>,
714    ) -> Self {
715        Self {
716            engines: RwLock::new(HashMap::new()),
717            region_map: DashMap::new(),
718            query_engine,
719            runtime,
720            event_listener,
721            table_provider_factory,
722            parallelism,
723        }
724    }
725
726    pub fn register_engine(&self, engine: RegionEngineRef) {
727        let engine_name = engine.name();
728        info!("Region Engine {engine_name} is registered");
729        self.engines
730            .write()
731            .unwrap()
732            .insert(engine_name.to_string(), engine);
733    }
734
735    fn get_engine(
736        &self,
737        region_id: RegionId,
738        region_change: &RegionChange,
739    ) -> Result<CurrentEngine> {
740        let current_region_status = self.region_map.get(&region_id);
741
742        let engine = match region_change {
743            RegionChange::Register(attribute) => match current_region_status {
744                Some(status) => match status.clone() {
745                    RegionEngineWithStatus::Registering(engine) => engine,
746                    RegionEngineWithStatus::Deregistering(_) => {
747                        return error::RegionBusySnafu { region_id }.fail()
748                    }
749                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
750                },
751                _ => self
752                    .engines
753                    .read()
754                    .unwrap()
755                    .get(attribute.engine())
756                    .with_context(|| RegionEngineNotFoundSnafu {
757                        name: attribute.engine(),
758                    })?
759                    .clone(),
760            },
761            RegionChange::Deregisters => match current_region_status {
762                Some(status) => match status.clone() {
763                    RegionEngineWithStatus::Registering(_) => {
764                        return error::RegionBusySnafu { region_id }.fail()
765                    }
766                    RegionEngineWithStatus::Deregistering(_) => {
767                        return Ok(CurrentEngine::EarlyReturn(0))
768                    }
769                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
770                },
771                None => return Ok(CurrentEngine::EarlyReturn(0)),
772            },
773            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
774                match current_region_status {
775                    Some(status) => match status.clone() {
776                        RegionEngineWithStatus::Registering(_) => {
777                            return error::RegionNotReadySnafu { region_id }.fail()
778                        }
779                        RegionEngineWithStatus::Deregistering(_) => {
780                            return error::RegionNotFoundSnafu { region_id }.fail()
781                        }
782                        RegionEngineWithStatus::Ready(engine) => engine,
783                    },
784                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
785                }
786            }
787        };
788
789        Ok(CurrentEngine::Engine(engine))
790    }
791
792    async fn handle_batch_open_requests_inner(
793        &self,
794        engine: RegionEngineRef,
795        parallelism: usize,
796        requests: Vec<(RegionId, RegionOpenRequest)>,
797    ) -> Result<Vec<RegionId>> {
798        let region_changes = requests
799            .iter()
800            .map(|(region_id, open)| {
801                let attribute = parse_region_attribute(&open.engine, &open.options)?;
802                Ok((*region_id, RegionChange::Register(attribute)))
803            })
804            .collect::<Result<HashMap<_, _>>>()?;
805
806        for (&region_id, region_change) in &region_changes {
807            self.set_region_status_not_ready(region_id, &engine, region_change)
808        }
809
810        let mut open_regions = Vec::with_capacity(requests.len());
811        let mut errors = vec![];
812        match engine
813            .handle_batch_open_requests(parallelism, requests)
814            .await
815            .with_context(|_| HandleBatchOpenRequestSnafu)
816        {
817            Ok(results) => {
818                for (region_id, result) in results {
819                    let region_change = &region_changes[&region_id];
820                    match result {
821                        Ok(_) => {
822                            if let Err(e) = self
823                                .set_region_status_ready(region_id, engine.clone(), *region_change)
824                                .await
825                            {
826                                error!(e; "Failed to set region to ready: {}", region_id);
827                                errors.push(BoxedError::new(e));
828                            } else {
829                                open_regions.push(region_id)
830                            }
831                        }
832                        Err(e) => {
833                            self.unset_region_status(region_id, &engine, *region_change);
834                            error!(e; "Failed to open region: {}", region_id);
835                            errors.push(e);
836                        }
837                    }
838                }
839            }
840            Err(e) => {
841                for (&region_id, region_change) in &region_changes {
842                    self.unset_region_status(region_id, &engine, *region_change);
843                }
844                error!(e; "Failed to open batch regions");
845                errors.push(BoxedError::new(e));
846            }
847        }
848
849        if !errors.is_empty() {
850            return error::UnexpectedSnafu {
851                // Returns the first error.
852                violated: format!("Failed to open batch regions: {:?}", errors[0]),
853            }
854            .fail();
855        }
856
857        Ok(open_regions)
858    }
859
860    pub async fn handle_batch_open_requests(
861        &self,
862        parallelism: usize,
863        requests: Vec<(RegionId, RegionOpenRequest)>,
864    ) -> Result<Vec<RegionId>> {
865        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
866            HashMap::with_capacity(requests.len());
867        for (region_id, request) in requests {
868            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
869                requests.push((region_id, request));
870            } else {
871                engine_grouped_requests
872                    .insert(request.engine.to_string(), vec![(region_id, request)]);
873            }
874        }
875
876        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
877        for (engine, requests) in engine_grouped_requests {
878            let engine = self
879                .engines
880                .read()
881                .unwrap()
882                .get(&engine)
883                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
884                .clone();
885            results.push(
886                self.handle_batch_open_requests_inner(engine, parallelism, requests)
887                    .await,
888            )
889        }
890
891        Ok(results
892            .into_iter()
893            .collect::<Result<Vec<_>>>()?
894            .into_iter()
895            .flatten()
896            .collect::<Vec<_>>())
897    }
898
899    // Handle requests in batch.
900    //
901    // limitation: all create requests must be in the same engine.
902    pub async fn handle_batch_request(
903        &self,
904        batch_request: BatchRegionDdlRequest,
905    ) -> Result<RegionResponse> {
906        let region_changes = match &batch_request {
907            BatchRegionDdlRequest::Create(requests) => requests
908                .iter()
909                .map(|(region_id, create)| {
910                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
911                    Ok((*region_id, RegionChange::Register(attribute)))
912                })
913                .collect::<Result<Vec<_>>>()?,
914            BatchRegionDdlRequest::Drop(requests) => requests
915                .iter()
916                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
917                .collect::<Vec<_>>(),
918            BatchRegionDdlRequest::Alter(requests) => requests
919                .iter()
920                .map(|(region_id, _)| (*region_id, RegionChange::None))
921                .collect::<Vec<_>>(),
922        };
923
924        // The ddl procedure will ensure all requests are in the same engine.
925        // Therefore, we can get the engine from the first request.
926        let (first_region_id, first_region_change) = region_changes.first().unwrap();
927        let engine = match self.get_engine(*first_region_id, first_region_change)? {
928            CurrentEngine::Engine(engine) => engine,
929            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
930        };
931
932        for (region_id, region_change) in region_changes.iter() {
933            self.set_region_status_not_ready(*region_id, &engine, region_change);
934        }
935
936        let ddl_type = batch_request.request_type();
937        let result = engine
938            .handle_batch_ddl_requests(batch_request)
939            .await
940            .context(HandleBatchDdlRequestSnafu { ddl_type });
941
942        match result {
943            Ok(result) => {
944                for (region_id, region_change) in &region_changes {
945                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
946                        .await?;
947                }
948
949                Ok(RegionResponse {
950                    affected_rows: result.affected_rows,
951                    extensions: result.extensions,
952                    metadata: Vec::new(),
953                })
954            }
955            Err(err) => {
956                for (region_id, region_change) in region_changes {
957                    self.unset_region_status(region_id, &engine, region_change);
958                }
959
960                Err(err)
961            }
962        }
963    }
964
965    pub async fn handle_request(
966        &self,
967        region_id: RegionId,
968        request: RegionRequest,
969    ) -> Result<RegionResponse> {
970        let request_type = request.request_type();
971        let region_id_str = region_id.to_string();
972        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
973            .with_label_values(&[&region_id_str, request_type])
974            .start_timer();
975
976        let region_change = match &request {
977            RegionRequest::Create(create) => {
978                let attribute = parse_region_attribute(&create.engine, &create.options)?;
979                RegionChange::Register(attribute)
980            }
981            RegionRequest::Open(open) => {
982                let attribute = parse_region_attribute(&open.engine, &open.options)?;
983                RegionChange::Register(attribute)
984            }
985            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
986            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
987                RegionChange::Ingest
988            }
989            RegionRequest::Alter(_)
990            | RegionRequest::Flush(_)
991            | RegionRequest::Compact(_)
992            | RegionRequest::Truncate(_) => RegionChange::None,
993            RegionRequest::Catchup(_) => RegionChange::Catchup,
994        };
995
996        let engine = match self.get_engine(region_id, &region_change)? {
997            CurrentEngine::Engine(engine) => engine,
998            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
999        };
1000
1001        // Sets corresponding region status to registering/deregistering before the operation.
1002        self.set_region_status_not_ready(region_id, &engine, &region_change);
1003
1004        match engine
1005            .handle_request(region_id, request)
1006            .await
1007            .with_context(|_| HandleRegionRequestSnafu { region_id })
1008        {
1009            Ok(result) => {
1010                // Update metrics
1011                if matches!(region_change, RegionChange::Ingest) {
1012                    crate::metrics::REGION_CHANGED_ROW_COUNT
1013                        .with_label_values(&[&region_id_str, request_type])
1014                        .inc_by(result.affected_rows as u64);
1015                }
1016                // Sets corresponding region status to ready.
1017                self.set_region_status_ready(region_id, engine.clone(), region_change)
1018                    .await?;
1019
1020                Ok(RegionResponse {
1021                    affected_rows: result.affected_rows,
1022                    extensions: result.extensions,
1023                    metadata: Vec::new(),
1024                })
1025            }
1026            Err(err) => {
1027                // Removes the region status if the operation fails.
1028                self.unset_region_status(region_id, &engine, region_change);
1029                Err(err)
1030            }
1031        }
1032    }
1033
1034    /// Handles the sync region request.
1035    pub async fn handle_sync_region(
1036        &self,
1037        engine: &RegionEngineRef,
1038        region_id: RegionId,
1039        manifest_info: RegionManifestInfo,
1040    ) -> Result<()> {
1041        let Some(new_opened_regions) = engine
1042            .sync_region(region_id, manifest_info)
1043            .await
1044            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1045            .new_opened_logical_region_ids()
1046        else {
1047            warn!("No new opened logical regions");
1048            return Ok(());
1049        };
1050
1051        for region in new_opened_regions {
1052            self.region_map
1053                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1054            info!("Logical region {} is registered!", region);
1055        }
1056
1057        Ok(())
1058    }
1059
1060    fn set_region_status_not_ready(
1061        &self,
1062        region_id: RegionId,
1063        engine: &RegionEngineRef,
1064        region_change: &RegionChange,
1065    ) {
1066        match region_change {
1067            RegionChange::Register(_) => {
1068                self.region_map.insert(
1069                    region_id,
1070                    RegionEngineWithStatus::Registering(engine.clone()),
1071                );
1072            }
1073            RegionChange::Deregisters => {
1074                self.region_map.insert(
1075                    region_id,
1076                    RegionEngineWithStatus::Deregistering(engine.clone()),
1077                );
1078            }
1079            _ => {}
1080        }
1081    }
1082
1083    fn unset_region_status(
1084        &self,
1085        region_id: RegionId,
1086        engine: &RegionEngineRef,
1087        region_change: RegionChange,
1088    ) {
1089        match region_change {
1090            RegionChange::None | RegionChange::Ingest => {}
1091            RegionChange::Register(_) => {
1092                self.region_map.remove(&region_id);
1093            }
1094            RegionChange::Deregisters => {
1095                self.region_map
1096                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1097            }
1098            RegionChange::Catchup => {}
1099        }
1100    }
1101
1102    async fn set_region_status_ready(
1103        &self,
1104        region_id: RegionId,
1105        engine: RegionEngineRef,
1106        region_change: RegionChange,
1107    ) -> Result<()> {
1108        let engine_type = engine.name();
1109        match region_change {
1110            RegionChange::None | RegionChange::Ingest => {}
1111            RegionChange::Register(attribute) => {
1112                info!(
1113                    "Region {region_id} is registered to engine {}",
1114                    attribute.engine()
1115                );
1116                self.region_map
1117                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1118
1119                match attribute {
1120                    RegionAttribute::Metric { physical } => {
1121                        if physical {
1122                            // Registers the logical regions belong to the physical region (`region_id`).
1123                            self.register_logical_regions(&engine, region_id).await?;
1124                            // We only send the `on_region_registered` event of the physical region.
1125                            self.event_listener.on_region_registered(region_id);
1126                        }
1127                    }
1128                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1129                    RegionAttribute::File => {
1130                        // do nothing
1131                    }
1132                }
1133            }
1134            RegionChange::Deregisters => {
1135                info!("Region {region_id} is deregistered from engine {engine_type}");
1136                self.region_map
1137                    .remove(&region_id)
1138                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1139                self.event_listener.on_region_deregistered(region_id);
1140            }
1141            RegionChange::Catchup => {
1142                if is_metric_engine(engine.name()) {
1143                    // Registers the logical regions belong to the physical region (`region_id`).
1144                    self.register_logical_regions(&engine, region_id).await?;
1145                }
1146            }
1147        }
1148        Ok(())
1149    }
1150
1151    async fn register_logical_regions(
1152        &self,
1153        engine: &RegionEngineRef,
1154        physical_region_id: RegionId,
1155    ) -> Result<()> {
1156        let metric_engine =
1157            engine
1158                .as_any()
1159                .downcast_ref::<MetricEngine>()
1160                .context(UnexpectedSnafu {
1161                    violated: format!(
1162                        "expecting engine type '{}', actual '{}'",
1163                        METRIC_ENGINE_NAME,
1164                        engine.name(),
1165                    ),
1166                })?;
1167
1168        let logical_regions = metric_engine
1169            .logical_regions(physical_region_id)
1170            .await
1171            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1172
1173        for region in logical_regions {
1174            self.region_map
1175                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1176            info!("Logical region {} is registered!", region);
1177        }
1178        Ok(())
1179    }
1180
1181    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
1182        // TODO(ruihang): add metrics and set trace id
1183
1184        // Build query context from gRPC header
1185        let query_ctx: QueryContextRef = request
1186            .header
1187            .as_ref()
1188            .map(|h| Arc::new(h.into()))
1189            .unwrap_or_else(|| QueryContextBuilder::default().build().into());
1190
1191        let result = self
1192            .query_engine
1193            .execute(request.plan, query_ctx)
1194            .await
1195            .context(ExecuteLogicalPlanSnafu)?;
1196
1197        match result.data {
1198            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1199                UnsupportedOutputSnafu { expected: "stream" }.fail()
1200            }
1201            OutputData::Stream(stream) => Ok(stream),
1202        }
1203    }
1204
1205    async fn stop(&self) -> Result<()> {
1206        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1207        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1208        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1209        //
1210        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1211        // it here, collect the values first then use later separately.
1212
1213        let regions = self
1214            .region_map
1215            .iter()
1216            .map(|x| (*x.key(), x.value().clone()))
1217            .collect::<Vec<_>>();
1218        let num_regions = regions.len();
1219
1220        for (region_id, engine) in regions {
1221            let closed = engine
1222                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1223                .await;
1224            match closed {
1225                Ok(_) => debug!("Region {region_id} is closed"),
1226                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1227            }
1228        }
1229        self.region_map.clear();
1230        info!("closed {num_regions} regions");
1231
1232        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1233        for (engine_name, engine) in engines {
1234            engine
1235                .stop()
1236                .await
1237                .context(StopRegionEngineSnafu { name: &engine_name })?;
1238            info!("Region engine {engine_name} is stopped");
1239        }
1240
1241        Ok(())
1242    }
1243}
1244
1245#[derive(Debug, Clone, Copy)]
1246enum RegionChange {
1247    None,
1248    Register(RegionAttribute),
1249    Deregisters,
1250    Catchup,
1251    Ingest,
1252}
1253
1254fn is_metric_engine(engine: &str) -> bool {
1255    engine == METRIC_ENGINE_NAME
1256}
1257
1258fn parse_region_attribute(
1259    engine: &str,
1260    options: &HashMap<String, String>,
1261) -> Result<RegionAttribute> {
1262    match engine {
1263        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1264        METRIC_ENGINE_NAME => {
1265            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1266
1267            Ok(RegionAttribute::Metric { physical })
1268        }
1269        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1270        _ => error::UnexpectedSnafu {
1271            violated: format!("Unknown engine: {}", engine),
1272        }
1273        .fail(),
1274    }
1275}
1276
1277#[derive(Debug, Clone, Copy)]
1278enum RegionAttribute {
1279    Mito,
1280    Metric { physical: bool },
1281    File,
1282}
1283
1284impl RegionAttribute {
1285    fn engine(&self) -> &'static str {
1286        match self {
1287            RegionAttribute::Mito => MITO_ENGINE_NAME,
1288            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1289            RegionAttribute::File => FILE_ENGINE_NAME,
1290        }
1291    }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296
1297    use std::assert_matches::assert_matches;
1298
1299    use api::v1::SemanticType;
1300    use common_error::ext::ErrorExt;
1301    use datatypes::prelude::ConcreteDataType;
1302    use mito2::test_util::CreateRequestBuilder;
1303    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1304    use store_api::region_engine::RegionEngine;
1305    use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
1306    use store_api::storage::RegionId;
1307
1308    use super::*;
1309    use crate::error::Result;
1310    use crate::tests::{mock_region_server, MockRegionEngine};
1311
1312    #[tokio::test]
1313    async fn test_region_registering() {
1314        common_telemetry::init_default_ut_logging();
1315
1316        let mut mock_region_server = mock_region_server();
1317        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1318        let engine_name = engine.name();
1319        mock_region_server.register_engine(engine.clone());
1320        let region_id = RegionId::new(1, 1);
1321        let builder = CreateRequestBuilder::new();
1322        let create_req = builder.build();
1323        // Tries to create/open a registering region.
1324        mock_region_server.inner.region_map.insert(
1325            region_id,
1326            RegionEngineWithStatus::Registering(engine.clone()),
1327        );
1328        let response = mock_region_server
1329            .handle_request(region_id, RegionRequest::Create(create_req))
1330            .await
1331            .unwrap();
1332        assert_eq!(response.affected_rows, 0);
1333        let status = mock_region_server
1334            .inner
1335            .region_map
1336            .get(&region_id)
1337            .unwrap()
1338            .clone();
1339        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1340
1341        mock_region_server.inner.region_map.insert(
1342            region_id,
1343            RegionEngineWithStatus::Registering(engine.clone()),
1344        );
1345        let response = mock_region_server
1346            .handle_request(
1347                region_id,
1348                RegionRequest::Open(RegionOpenRequest {
1349                    engine: engine_name.to_string(),
1350                    region_dir: String::new(),
1351                    options: Default::default(),
1352                    skip_wal_replay: false,
1353                }),
1354            )
1355            .await
1356            .unwrap();
1357        assert_eq!(response.affected_rows, 0);
1358        let status = mock_region_server
1359            .inner
1360            .region_map
1361            .get(&region_id)
1362            .unwrap()
1363            .clone();
1364        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1365    }
1366
1367    #[tokio::test]
1368    async fn test_region_deregistering() {
1369        common_telemetry::init_default_ut_logging();
1370
1371        let mut mock_region_server = mock_region_server();
1372        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1373
1374        mock_region_server.register_engine(engine.clone());
1375
1376        let region_id = RegionId::new(1, 1);
1377
1378        // Tries to drop/close a registering region.
1379        mock_region_server.inner.region_map.insert(
1380            region_id,
1381            RegionEngineWithStatus::Deregistering(engine.clone()),
1382        );
1383
1384        let response = mock_region_server
1385            .handle_request(
1386                region_id,
1387                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1388            )
1389            .await
1390            .unwrap();
1391        assert_eq!(response.affected_rows, 0);
1392
1393        let status = mock_region_server
1394            .inner
1395            .region_map
1396            .get(&region_id)
1397            .unwrap()
1398            .clone();
1399        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1400
1401        mock_region_server.inner.region_map.insert(
1402            region_id,
1403            RegionEngineWithStatus::Deregistering(engine.clone()),
1404        );
1405
1406        let response = mock_region_server
1407            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1408            .await
1409            .unwrap();
1410        assert_eq!(response.affected_rows, 0);
1411
1412        let status = mock_region_server
1413            .inner
1414            .region_map
1415            .get(&region_id)
1416            .unwrap()
1417            .clone();
1418        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1419    }
1420
1421    #[tokio::test]
1422    async fn test_region_not_ready() {
1423        common_telemetry::init_default_ut_logging();
1424
1425        let mut mock_region_server = mock_region_server();
1426        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1427
1428        mock_region_server.register_engine(engine.clone());
1429
1430        let region_id = RegionId::new(1, 1);
1431
1432        // Tries to drop/close a registering region.
1433        mock_region_server.inner.region_map.insert(
1434            region_id,
1435            RegionEngineWithStatus::Registering(engine.clone()),
1436        );
1437
1438        let err = mock_region_server
1439            .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
1440            .await
1441            .unwrap_err();
1442
1443        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1444    }
1445
1446    #[tokio::test]
1447    async fn test_region_request_failed() {
1448        common_telemetry::init_default_ut_logging();
1449
1450        let mut mock_region_server = mock_region_server();
1451        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1452            MITO_ENGINE_NAME,
1453            Box::new(|_region_id, _request| {
1454                error::UnexpectedSnafu {
1455                    violated: "test".to_string(),
1456                }
1457                .fail()
1458            }),
1459        );
1460
1461        mock_region_server.register_engine(engine.clone());
1462
1463        let region_id = RegionId::new(1, 1);
1464        let builder = CreateRequestBuilder::new();
1465        let create_req = builder.build();
1466        mock_region_server
1467            .handle_request(region_id, RegionRequest::Create(create_req))
1468            .await
1469            .unwrap_err();
1470
1471        let status = mock_region_server.inner.region_map.get(&region_id);
1472        assert!(status.is_none());
1473
1474        mock_region_server
1475            .inner
1476            .region_map
1477            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1478
1479        mock_region_server
1480            .handle_request(
1481                region_id,
1482                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1483            )
1484            .await
1485            .unwrap_err();
1486
1487        let status = mock_region_server.inner.region_map.get(&region_id);
1488        assert!(status.is_some());
1489    }
1490
1491    struct CurrentEngineTest {
1492        region_id: RegionId,
1493        current_region_status: Option<RegionEngineWithStatus>,
1494        region_change: RegionChange,
1495        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1496    }
1497
1498    #[tokio::test]
1499    async fn test_current_engine() {
1500        common_telemetry::init_default_ut_logging();
1501
1502        let mut mock_region_server = mock_region_server();
1503        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1504        mock_region_server.register_engine(engine.clone());
1505
1506        let region_id = RegionId::new(1024, 1);
1507        let tests = vec![
1508            // RegionChange::None
1509            CurrentEngineTest {
1510                region_id,
1511                current_region_status: None,
1512                region_change: RegionChange::None,
1513                assert: Box::new(|result| {
1514                    let err = result.unwrap_err();
1515                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1516                }),
1517            },
1518            CurrentEngineTest {
1519                region_id,
1520                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1521                region_change: RegionChange::None,
1522                assert: Box::new(|result| {
1523                    let current_engine = result.unwrap();
1524                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1525                }),
1526            },
1527            CurrentEngineTest {
1528                region_id,
1529                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1530                region_change: RegionChange::None,
1531                assert: Box::new(|result| {
1532                    let err = result.unwrap_err();
1533                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1534                }),
1535            },
1536            CurrentEngineTest {
1537                region_id,
1538                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1539                region_change: RegionChange::None,
1540                assert: Box::new(|result| {
1541                    let err = result.unwrap_err();
1542                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1543                }),
1544            },
1545            // RegionChange::Register
1546            CurrentEngineTest {
1547                region_id,
1548                current_region_status: None,
1549                region_change: RegionChange::Register(RegionAttribute::Mito),
1550                assert: Box::new(|result| {
1551                    let current_engine = result.unwrap();
1552                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1553                }),
1554            },
1555            CurrentEngineTest {
1556                region_id,
1557                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1558                region_change: RegionChange::Register(RegionAttribute::Mito),
1559                assert: Box::new(|result| {
1560                    let current_engine = result.unwrap();
1561                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1562                }),
1563            },
1564            CurrentEngineTest {
1565                region_id,
1566                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1567                region_change: RegionChange::Register(RegionAttribute::Mito),
1568                assert: Box::new(|result| {
1569                    let err = result.unwrap_err();
1570                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1571                }),
1572            },
1573            CurrentEngineTest {
1574                region_id,
1575                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1576                region_change: RegionChange::Register(RegionAttribute::Mito),
1577                assert: Box::new(|result| {
1578                    let current_engine = result.unwrap();
1579                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1580                }),
1581            },
1582            // RegionChange::Deregister
1583            CurrentEngineTest {
1584                region_id,
1585                current_region_status: None,
1586                region_change: RegionChange::Deregisters,
1587                assert: Box::new(|result| {
1588                    let current_engine = result.unwrap();
1589                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1590                }),
1591            },
1592            CurrentEngineTest {
1593                region_id,
1594                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1595                region_change: RegionChange::Deregisters,
1596                assert: Box::new(|result| {
1597                    let err = result.unwrap_err();
1598                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1599                }),
1600            },
1601            CurrentEngineTest {
1602                region_id,
1603                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1604                region_change: RegionChange::Deregisters,
1605                assert: Box::new(|result| {
1606                    let current_engine = result.unwrap();
1607                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1608                }),
1609            },
1610            CurrentEngineTest {
1611                region_id,
1612                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1613                region_change: RegionChange::Deregisters,
1614                assert: Box::new(|result| {
1615                    let current_engine = result.unwrap();
1616                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1617                }),
1618            },
1619        ];
1620
1621        for test in tests {
1622            let CurrentEngineTest {
1623                region_id,
1624                current_region_status,
1625                region_change,
1626                assert,
1627            } = test;
1628
1629            // Sets up
1630            if let Some(status) = current_region_status {
1631                mock_region_server
1632                    .inner
1633                    .region_map
1634                    .insert(region_id, status);
1635            } else {
1636                mock_region_server.inner.region_map.remove(&region_id);
1637            }
1638
1639            let result = mock_region_server
1640                .inner
1641                .get_engine(region_id, &region_change);
1642
1643            assert(result);
1644        }
1645    }
1646
1647    #[tokio::test]
1648    async fn test_region_server_parallelism() {
1649        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1650        let first_query = p.acquire().await;
1651        assert!(first_query.is_ok());
1652        let second_query = p.acquire().await;
1653        assert!(second_query.is_ok());
1654        let third_query = p.acquire().await;
1655        assert!(third_query.is_err());
1656        let err = third_query.unwrap_err();
1657        assert_eq!(
1658            err.output_msg(),
1659            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1660        );
1661        drop(first_query);
1662        let forth_query = p.acquire().await;
1663        assert!(forth_query.is_ok());
1664    }
1665
1666    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1667        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1668        metadata_builder.push_column_metadata(ColumnMetadata {
1669            column_schema: datatypes::schema::ColumnSchema::new(
1670                "timestamp",
1671                ConcreteDataType::timestamp_nanosecond_datatype(),
1672                false,
1673            ),
1674            semantic_type: SemanticType::Timestamp,
1675            column_id: 0,
1676        });
1677        metadata_builder.push_column_metadata(ColumnMetadata {
1678            column_schema: datatypes::schema::ColumnSchema::new(
1679                "file",
1680                ConcreteDataType::string_datatype(),
1681                true,
1682            ),
1683            semantic_type: SemanticType::Tag,
1684            column_id: 1,
1685        });
1686        metadata_builder.push_column_metadata(ColumnMetadata {
1687            column_schema: datatypes::schema::ColumnSchema::new(
1688                "message",
1689                ConcreteDataType::string_datatype(),
1690                true,
1691            ),
1692            semantic_type: SemanticType::Field,
1693            column_id: 2,
1694        });
1695        metadata_builder.primary_key(vec![1]);
1696        metadata_builder.build().unwrap()
1697    }
1698
1699    #[tokio::test]
1700    async fn test_handle_list_metadata_request() {
1701        common_telemetry::init_default_ut_logging();
1702
1703        let mut mock_region_server = mock_region_server();
1704        let region_id_1 = RegionId::new(1, 0);
1705        let region_id_2 = RegionId::new(2, 0);
1706
1707        let metadata_1 = mock_region_metadata(region_id_1);
1708        let metadata_2 = mock_region_metadata(region_id_2);
1709        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
1710
1711        let metadata_1 = Arc::new(metadata_1);
1712        let metadata_2 = Arc::new(metadata_2);
1713        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1714            MITO_ENGINE_NAME,
1715            Box::new(move |region_id| {
1716                if region_id == region_id_1 {
1717                    Ok(metadata_1.clone())
1718                } else if region_id == region_id_2 {
1719                    Ok(metadata_2.clone())
1720                } else {
1721                    error::RegionNotFoundSnafu { region_id }.fail()
1722                }
1723            }),
1724        );
1725
1726        mock_region_server.register_engine(engine.clone());
1727        mock_region_server
1728            .inner
1729            .region_map
1730            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1731        mock_region_server
1732            .inner
1733            .region_map
1734            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1735
1736        // All regions exist.
1737        let list_metadata_request = ListMetadataRequest {
1738            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1739        };
1740        let response = mock_region_server
1741            .handle_list_metadata_request(&list_metadata_request)
1742            .await
1743            .unwrap();
1744        let decoded_metadata: Vec<Option<RegionMetadata>> =
1745            serde_json::from_slice(&response.metadata).unwrap();
1746        assert_eq!(metadatas, decoded_metadata);
1747    }
1748
1749    #[tokio::test]
1750    async fn test_handle_list_metadata_not_found() {
1751        common_telemetry::init_default_ut_logging();
1752
1753        let mut mock_region_server = mock_region_server();
1754        let region_id_1 = RegionId::new(1, 0);
1755        let region_id_2 = RegionId::new(2, 0);
1756
1757        let metadata_1 = mock_region_metadata(region_id_1);
1758        let metadatas = vec![Some(metadata_1.clone()), None];
1759
1760        let metadata_1 = Arc::new(metadata_1);
1761        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1762            MITO_ENGINE_NAME,
1763            Box::new(move |region_id| {
1764                if region_id == region_id_1 {
1765                    Ok(metadata_1.clone())
1766                } else {
1767                    error::RegionNotFoundSnafu { region_id }.fail()
1768                }
1769            }),
1770        );
1771
1772        mock_region_server.register_engine(engine.clone());
1773        mock_region_server
1774            .inner
1775            .region_map
1776            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1777
1778        // Not in region map.
1779        let list_metadata_request = ListMetadataRequest {
1780            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1781        };
1782        let response = mock_region_server
1783            .handle_list_metadata_request(&list_metadata_request)
1784            .await
1785            .unwrap();
1786        let decoded_metadata: Vec<Option<RegionMetadata>> =
1787            serde_json::from_slice(&response.metadata).unwrap();
1788        assert_eq!(metadatas, decoded_metadata);
1789
1790        // Not in region engine.
1791        mock_region_server
1792            .inner
1793            .region_map
1794            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1795        let response = mock_region_server
1796            .handle_list_metadata_request(&list_metadata_request)
1797            .await
1798            .unwrap();
1799        let decoded_metadata: Vec<Option<RegionMetadata>> =
1800            serde_json::from_slice(&response.metadata).unwrap();
1801        assert_eq!(metadatas, decoded_metadata);
1802    }
1803
1804    #[tokio::test]
1805    async fn test_handle_list_metadata_failed() {
1806        common_telemetry::init_default_ut_logging();
1807
1808        let mut mock_region_server = mock_region_server();
1809        let region_id_1 = RegionId::new(1, 0);
1810
1811        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1812            MITO_ENGINE_NAME,
1813            Box::new(move |region_id| {
1814                error::UnexpectedSnafu {
1815                    violated: format!("Failed to get region {region_id}"),
1816                }
1817                .fail()
1818            }),
1819        );
1820
1821        mock_region_server.register_engine(engine.clone());
1822        mock_region_server
1823            .inner
1824            .region_map
1825            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1826
1827        // Failed to get.
1828        let list_metadata_request = ListMetadataRequest {
1829            region_ids: vec![region_id_1.as_u64()],
1830        };
1831        mock_region_server
1832            .handle_list_metadata_request(&list_metadata_request)
1833            .await
1834            .unwrap_err();
1835    }
1836}