datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use api::region::RegionResponse;
22use api::v1::meta::TopicStat;
23use api::v1::region::sync_request::ManifestInfo;
24use api::v1::region::{
25    region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
26};
27use api::v1::{ResponseHeader, Status};
28use arrow_flight::{FlightData, Ticket};
29use async_trait::async_trait;
30use bytes::Bytes;
31use common_error::ext::{BoxedError, ErrorExt};
32use common_error::status_code::StatusCode;
33use common_meta::datanode::TopicStatsReporter;
34use common_query::request::QueryRequest;
35use common_query::OutputData;
36use common_recordbatch::SendableRecordBatchStream;
37use common_runtime::Runtime;
38use common_telemetry::tracing::{self, info_span};
39use common_telemetry::tracing_context::{FutureExt, TracingContext};
40use common_telemetry::{debug, error, info, warn};
41use dashmap::DashMap;
42use datafusion::datasource::{provider_as_source, TableProvider};
43use datafusion::error::Result as DfResult;
44use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
45use datafusion_expr::{LogicalPlan, TableSource};
46use futures_util::future::try_join_all;
47use metric_engine::engine::MetricEngine;
48use mito2::engine::MITO_ENGINE_NAME;
49use prost::Message;
50pub use query::dummy_catalog::{
51    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
52};
53use query::QueryEngineRef;
54use serde_json;
55use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
56use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
57use servers::grpc::region_server::RegionServerHandler;
58use servers::grpc::FlightCompression;
59use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
60use snafu::{ensure, OptionExt, ResultExt};
61use store_api::metric_engine_consts::{
62    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
63};
64use store_api::region_engine::{
65    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
66    SettableRegionRoleState,
67};
68use store_api::region_request::{
69    AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
70};
71use store_api::storage::RegionId;
72use tokio::sync::{Semaphore, SemaphorePermit};
73use tokio::time::timeout;
74use tonic::{Request, Response, Result as TonicResult};
75
76use crate::error::{
77    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
78    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
79    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
80    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
81    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
82    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
83};
84use crate::event_listener::RegionServerEventListenerRef;
85
86#[derive(Clone)]
87pub struct RegionServer {
88    inner: Arc<RegionServerInner>,
89    flight_compression: FlightCompression,
90}
91
92pub struct RegionStat {
93    pub region_id: RegionId,
94    pub engine: String,
95    pub role: RegionRole,
96}
97
98impl RegionServer {
99    pub fn new(
100        query_engine: QueryEngineRef,
101        runtime: Runtime,
102        event_listener: RegionServerEventListenerRef,
103        flight_compression: FlightCompression,
104    ) -> Self {
105        Self::with_table_provider(
106            query_engine,
107            runtime,
108            event_listener,
109            Arc::new(DummyTableProviderFactory),
110            0,
111            Duration::from_millis(0),
112            flight_compression,
113        )
114    }
115
116    pub fn with_table_provider(
117        query_engine: QueryEngineRef,
118        runtime: Runtime,
119        event_listener: RegionServerEventListenerRef,
120        table_provider_factory: TableProviderFactoryRef,
121        max_concurrent_queries: usize,
122        concurrent_query_limiter_timeout: Duration,
123        flight_compression: FlightCompression,
124    ) -> Self {
125        Self {
126            inner: Arc::new(RegionServerInner::new(
127                query_engine,
128                runtime,
129                event_listener,
130                table_provider_factory,
131                RegionServerParallelism::from_opts(
132                    max_concurrent_queries,
133                    concurrent_query_limiter_timeout,
134                ),
135            )),
136            flight_compression,
137        }
138    }
139
140    /// Registers an engine.
141    pub fn register_engine(&mut self, engine: RegionEngineRef) {
142        self.inner.register_engine(engine);
143    }
144
145    /// Sets the topic stats.
146    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
147        self.inner.set_topic_stats_reporter(topic_stats_reporter);
148    }
149
150    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
151    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
152        match self.inner.get_engine(region_id, &RegionChange::None) {
153            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
154            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
155            Err(error::Error::RegionNotFound { .. }) => Ok(None),
156            Err(err) => Err(err),
157        }
158    }
159
160    #[tracing::instrument(skip_all)]
161    pub async fn handle_batch_open_requests(
162        &self,
163        parallelism: usize,
164        requests: Vec<(RegionId, RegionOpenRequest)>,
165        ignore_nonexistent_region: bool,
166    ) -> Result<Vec<RegionId>> {
167        self.inner
168            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
169            .await
170    }
171
172    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
173    pub async fn handle_request(
174        &self,
175        region_id: RegionId,
176        request: RegionRequest,
177    ) -> Result<RegionResponse> {
178        self.inner.handle_request(region_id, request).await
179    }
180
181    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
182    async fn table_provider(
183        &self,
184        region_id: RegionId,
185        ctx: Option<QueryContextRef>,
186    ) -> Result<Arc<dyn TableProvider>> {
187        let status = self
188            .inner
189            .region_map
190            .get(&region_id)
191            .context(RegionNotFoundSnafu { region_id })?
192            .clone();
193        ensure!(
194            matches!(status, RegionEngineWithStatus::Ready(_)),
195            RegionNotReadySnafu { region_id }
196        );
197
198        self.inner
199            .table_provider_factory
200            .create(region_id, status.into_engine(), ctx)
201            .await
202            .context(ExecuteLogicalPlanSnafu)
203    }
204
205    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
206    pub async fn handle_remote_read(
207        &self,
208        request: api::v1::region::QueryRequest,
209        query_ctx: QueryContextRef,
210    ) -> Result<SendableRecordBatchStream> {
211        let _permit = if let Some(p) = &self.inner.parallelism {
212            Some(p.acquire().await?)
213        } else {
214            None
215        };
216
217        let region_id = RegionId::from_u64(request.region_id);
218        let provider = self
219            .table_provider(region_id, Some(query_ctx.clone()))
220            .await?;
221        let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
222
223        if query_ctx.explain_verbose() {
224            common_telemetry::info!("Handle remote read for region: {}", region_id);
225        }
226
227        let decoder = self
228            .inner
229            .query_engine
230            .engine_context(query_ctx.clone())
231            .new_plan_decoder()
232            .context(NewPlanDecoderSnafu)?;
233
234        let plan = decoder
235            .decode(Bytes::from(request.plan), catalog_list, false)
236            .await
237            .context(DecodeLogicalPlanSnafu)?;
238
239        self.inner
240            .handle_read(
241                QueryRequest {
242                    header: request.header,
243                    region_id,
244                    plan,
245                },
246                query_ctx,
247            )
248            .await
249    }
250
251    #[tracing::instrument(skip_all)]
252    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
253        let _permit = if let Some(p) = &self.inner.parallelism {
254            Some(p.acquire().await?)
255        } else {
256            None
257        };
258
259        let ctx = request.header.as_ref().map(|h| h.into());
260        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
261
262        let provider = self
263            .table_provider(request.region_id, Some(query_ctx.clone()))
264            .await?;
265
266        struct RegionDataSourceInjector {
267            source: Arc<dyn TableSource>,
268        }
269
270        impl TreeNodeRewriter for RegionDataSourceInjector {
271            type Node = LogicalPlan;
272
273            fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
274                Ok(match node {
275                    LogicalPlan::TableScan(mut scan) => {
276                        scan.source = self.source.clone();
277                        Transformed::yes(LogicalPlan::TableScan(scan))
278                    }
279                    _ => Transformed::no(node),
280                })
281            }
282        }
283
284        let plan = request
285            .plan
286            .rewrite(&mut RegionDataSourceInjector {
287                source: provider_as_source(provider),
288            })
289            .context(DataFusionSnafu)?
290            .data;
291
292        self.inner
293            .handle_read(QueryRequest { plan, ..request }, query_ctx)
294            .await
295    }
296
297    /// Returns all opened and reportable regions.
298    ///
299    /// Notes: except all metrics regions.
300    pub fn reportable_regions(&self) -> Vec<RegionStat> {
301        self.inner
302            .region_map
303            .iter()
304            .filter_map(|e| {
305                let region_id = *e.key();
306                // Filters out any regions whose role equals None.
307                e.role(region_id).map(|role| RegionStat {
308                    region_id,
309                    engine: e.value().name().to_string(),
310                    role,
311                })
312            })
313            .collect()
314    }
315
316    /// Returns the reportable topics.
317    pub fn topic_stats(&self) -> Vec<TopicStat> {
318        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
319        let Some(reporter) = reporter.as_mut() else {
320            return vec![];
321        };
322        reporter
323            .reportable_topics()
324            .into_iter()
325            .map(|stat| TopicStat {
326                topic_name: stat.topic,
327                record_size: stat.record_size,
328                record_num: stat.record_num,
329                latest_entry_id: stat.latest_entry_id,
330            })
331            .collect()
332    }
333
334    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
335        self.inner.region_map.get(&region_id).and_then(|engine| {
336            engine.role(region_id).map(|role| match role {
337                RegionRole::Follower => false,
338                RegionRole::Leader => true,
339                RegionRole::DowngradingLeader => true,
340            })
341        })
342    }
343
344    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
345        let engine = self
346            .inner
347            .region_map
348            .get(&region_id)
349            .with_context(|| RegionNotFoundSnafu { region_id })?;
350        engine
351            .set_region_role(region_id, role)
352            .with_context(|_| HandleRegionRequestSnafu { region_id })
353    }
354
355    /// Set region role state gracefully.
356    ///
357    /// For [SettableRegionRoleState::Follower]:
358    /// After the call returns, the engine ensures that
359    /// no **further** write or flush operations will succeed in this region.
360    ///
361    /// For [SettableRegionRoleState::DowngradingLeader]:
362    /// After the call returns, the engine ensures that
363    /// no **further** write operations will succeed in this region.
364    pub async fn set_region_role_state_gracefully(
365        &self,
366        region_id: RegionId,
367        state: SettableRegionRoleState,
368    ) -> Result<SetRegionRoleStateResponse> {
369        match self.inner.region_map.get(&region_id) {
370            Some(engine) => Ok(engine
371                .set_region_role_state_gracefully(region_id, state)
372                .await
373                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
374            None => Ok(SetRegionRoleStateResponse::NotFound),
375        }
376    }
377
378    pub fn runtime(&self) -> Runtime {
379        self.inner.runtime.clone()
380    }
381
382    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
383        match self.inner.region_map.get(&region_id) {
384            Some(e) => e.region_statistic(region_id),
385            None => None,
386        }
387    }
388
389    /// Stop the region server.
390    pub async fn stop(&self) -> Result<()> {
391        self.inner.stop().await
392    }
393
394    #[cfg(test)]
395    /// Registers a region for test purpose.
396    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
397        self.inner
398            .region_map
399            .insert(region_id, RegionEngineWithStatus::Ready(engine));
400    }
401
402    async fn handle_batch_ddl_requests(
403        &self,
404        request: region_request::Body,
405    ) -> Result<RegionResponse> {
406        // Safety: we have already checked the request type in `RegionServer::handle()`.
407        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
408            .context(BuildRegionRequestsSnafu)?
409            .unwrap();
410        let tracing_context = TracingContext::from_current_span();
411
412        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
413        self.inner
414            .handle_batch_request(batch_request)
415            .trace(span)
416            .await
417    }
418
419    async fn handle_requests_in_parallel(
420        &self,
421        request: region_request::Body,
422    ) -> Result<RegionResponse> {
423        let requests =
424            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
425        let tracing_context = TracingContext::from_current_span();
426
427        let join_tasks = requests.into_iter().map(|(region_id, req)| {
428            let self_to_move = self;
429            let span = tracing_context.attach(info_span!(
430                "RegionServer::handle_region_request",
431                region_id = region_id.to_string()
432            ));
433            async move {
434                self_to_move
435                    .handle_request(region_id, req)
436                    .trace(span)
437                    .await
438            }
439        });
440
441        let results = try_join_all(join_tasks).await?;
442        let mut affected_rows = 0;
443        let mut extensions = HashMap::new();
444        for result in results {
445            affected_rows += result.affected_rows;
446            extensions.extend(result.extensions);
447        }
448
449        Ok(RegionResponse {
450            affected_rows,
451            extensions,
452            metadata: Vec::new(),
453        })
454    }
455
456    async fn handle_requests_in_serial(
457        &self,
458        request: region_request::Body,
459    ) -> Result<RegionResponse> {
460        let requests =
461            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
462        let tracing_context = TracingContext::from_current_span();
463
464        let mut affected_rows = 0;
465        let mut extensions = HashMap::new();
466        // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
467        // modify this part to avoid inefficient serial loop calls.
468        for (region_id, req) in requests {
469            let span = tracing_context.attach(info_span!(
470                "RegionServer::handle_region_request",
471                region_id = region_id.to_string()
472            ));
473            let result = self.handle_request(region_id, req).trace(span).await?;
474
475            affected_rows += result.affected_rows;
476            extensions.extend(result.extensions);
477        }
478
479        Ok(RegionResponse {
480            affected_rows,
481            extensions,
482            metadata: Vec::new(),
483        })
484    }
485
486    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
487        let region_id = RegionId::from_u64(request.region_id);
488        let manifest_info = request
489            .manifest_info
490            .context(error::MissingRequiredFieldSnafu {
491                name: "manifest_info",
492            })?;
493
494        let manifest_info = match manifest_info {
495            ManifestInfo::MitoManifestInfo(info) => {
496                RegionManifestInfo::mito(info.data_manifest_version, 0)
497            }
498            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
499                info.data_manifest_version,
500                0,
501                info.metadata_manifest_version,
502                0,
503            ),
504        };
505
506        let tracing_context = TracingContext::from_current_span();
507        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
508
509        self.sync_region(region_id, manifest_info)
510            .trace(span)
511            .await
512            .map(|_| RegionResponse::new(AffectedRows::default()))
513    }
514
515    /// Handles the ListMetadata request and retrieves metadata for specified regions.
516    ///
517    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
518    /// non-existing regions as `null`.
519    #[tracing::instrument(skip_all)]
520    async fn handle_list_metadata_request(
521        &self,
522        request: &ListMetadataRequest,
523    ) -> Result<RegionResponse> {
524        let mut region_metadatas = Vec::new();
525        // Collect metadata for each region
526        for region_id in &request.region_ids {
527            let region_id = RegionId::from_u64(*region_id);
528            // Get the engine.
529            let Some(engine) = self.find_engine(region_id)? else {
530                region_metadatas.push(None);
531                continue;
532            };
533
534            match engine.get_metadata(region_id).await {
535                Ok(metadata) => region_metadatas.push(Some(metadata)),
536                Err(err) => {
537                    if err.status_code() == StatusCode::RegionNotFound {
538                        region_metadatas.push(None);
539                    } else {
540                        Err(err).with_context(|_| GetRegionMetadataSnafu {
541                            engine: engine.name(),
542                            region_id,
543                        })?;
544                    }
545                }
546            }
547        }
548
549        // Serialize metadata to JSON
550        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
551
552        let response = RegionResponse::from_metadata(json_result);
553
554        Ok(response)
555    }
556
557    /// Sync region manifest and registers new opened logical regions.
558    pub async fn sync_region(
559        &self,
560        region_id: RegionId,
561        manifest_info: RegionManifestInfo,
562    ) -> Result<()> {
563        let engine_with_status = self
564            .inner
565            .region_map
566            .get(&region_id)
567            .with_context(|| RegionNotFoundSnafu { region_id })?;
568
569        self.inner
570            .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
571            .await
572    }
573}
574
575#[async_trait]
576impl RegionServerHandler for RegionServer {
577    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
578        let response = match &request {
579            region_request::Body::Creates(_)
580            | region_request::Body::Drops(_)
581            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
582            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
583                self.handle_requests_in_parallel(request).await
584            }
585            region_request::Body::Sync(sync_request) => {
586                self.handle_sync_region_request(sync_request).await
587            }
588            region_request::Body::ListMetadata(list_metadata_request) => {
589                self.handle_list_metadata_request(list_metadata_request)
590                    .await
591            }
592            _ => self.handle_requests_in_serial(request).await,
593        }
594        .map_err(BoxedError::new)
595        .context(ExecuteGrpcRequestSnafu)?;
596
597        Ok(RegionResponseV1 {
598            header: Some(ResponseHeader {
599                status: Some(Status {
600                    status_code: StatusCode::Success as _,
601                    ..Default::default()
602                }),
603            }),
604            affected_rows: response.affected_rows as _,
605            extensions: response.extensions,
606            metadata: response.metadata,
607        })
608    }
609}
610
611#[async_trait]
612impl FlightCraft for RegionServer {
613    async fn do_get(
614        &self,
615        request: Request<Ticket>,
616    ) -> TonicResult<Response<TonicStream<FlightData>>> {
617        let ticket = request.into_inner().ticket;
618        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
619            .context(servers_error::InvalidFlightTicketSnafu)?;
620        let tracing_context = request
621            .header
622            .as_ref()
623            .map(|h| TracingContext::from_w3c(&h.tracing_context))
624            .unwrap_or_default();
625        let query_ctx = request
626            .header
627            .as_ref()
628            .map(|h| Arc::new(QueryContext::from(h)))
629            .unwrap_or(QueryContext::arc());
630
631        let result = self
632            .handle_remote_read(request, query_ctx.clone())
633            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
634            .await?;
635
636        let stream = Box::pin(FlightRecordBatchStream::new(
637            result,
638            tracing_context,
639            self.flight_compression,
640            query_ctx,
641        ));
642        Ok(Response::new(stream))
643    }
644}
645
646#[derive(Clone)]
647enum RegionEngineWithStatus {
648    // An opening, or creating region.
649    Registering(RegionEngineRef),
650    // A closing, or dropping region.
651    Deregistering(RegionEngineRef),
652    // A ready region.
653    Ready(RegionEngineRef),
654}
655
656impl RegionEngineWithStatus {
657    /// Returns [RegionEngineRef].
658    pub fn into_engine(self) -> RegionEngineRef {
659        match self {
660            RegionEngineWithStatus::Registering(engine) => engine,
661            RegionEngineWithStatus::Deregistering(engine) => engine,
662            RegionEngineWithStatus::Ready(engine) => engine,
663        }
664    }
665
666    /// Returns [RegionEngineRef] reference.
667    pub fn engine(&self) -> &RegionEngineRef {
668        match self {
669            RegionEngineWithStatus::Registering(engine) => engine,
670            RegionEngineWithStatus::Deregistering(engine) => engine,
671            RegionEngineWithStatus::Ready(engine) => engine,
672        }
673    }
674}
675
676impl Deref for RegionEngineWithStatus {
677    type Target = RegionEngineRef;
678
679    fn deref(&self) -> &Self::Target {
680        match self {
681            RegionEngineWithStatus::Registering(engine) => engine,
682            RegionEngineWithStatus::Deregistering(engine) => engine,
683            RegionEngineWithStatus::Ready(engine) => engine,
684        }
685    }
686}
687
688struct RegionServerInner {
689    engines: RwLock<HashMap<String, RegionEngineRef>>,
690    region_map: DashMap<RegionId, RegionEngineWithStatus>,
691    query_engine: QueryEngineRef,
692    runtime: Runtime,
693    event_listener: RegionServerEventListenerRef,
694    table_provider_factory: TableProviderFactoryRef,
695    // The number of queries allowed to be executed at the same time.
696    // Act as last line of defense on datanode to prevent query overloading.
697    parallelism: Option<RegionServerParallelism>,
698    // The topic stats reporter.
699    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
700}
701
702struct RegionServerParallelism {
703    semaphore: Semaphore,
704    timeout: Duration,
705}
706
707impl RegionServerParallelism {
708    pub fn from_opts(
709        max_concurrent_queries: usize,
710        concurrent_query_limiter_timeout: Duration,
711    ) -> Option<Self> {
712        if max_concurrent_queries == 0 {
713            return None;
714        }
715        Some(RegionServerParallelism {
716            semaphore: Semaphore::new(max_concurrent_queries),
717            timeout: concurrent_query_limiter_timeout,
718        })
719    }
720
721    pub async fn acquire(&self) -> Result<SemaphorePermit> {
722        timeout(self.timeout, self.semaphore.acquire())
723            .await
724            .context(ConcurrentQueryLimiterTimeoutSnafu)?
725            .context(ConcurrentQueryLimiterClosedSnafu)
726    }
727}
728
729enum CurrentEngine {
730    Engine(RegionEngineRef),
731    EarlyReturn(AffectedRows),
732}
733
734impl Debug for CurrentEngine {
735    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736        match self {
737            CurrentEngine::Engine(engine) => f
738                .debug_struct("CurrentEngine")
739                .field("engine", &engine.name())
740                .finish(),
741            CurrentEngine::EarlyReturn(rows) => f
742                .debug_struct("CurrentEngine")
743                .field("return", rows)
744                .finish(),
745        }
746    }
747}
748
749impl RegionServerInner {
750    pub fn new(
751        query_engine: QueryEngineRef,
752        runtime: Runtime,
753        event_listener: RegionServerEventListenerRef,
754        table_provider_factory: TableProviderFactoryRef,
755        parallelism: Option<RegionServerParallelism>,
756    ) -> Self {
757        Self {
758            engines: RwLock::new(HashMap::new()),
759            region_map: DashMap::new(),
760            query_engine,
761            runtime,
762            event_listener,
763            table_provider_factory,
764            parallelism,
765            topic_stats_reporter: RwLock::new(None),
766        }
767    }
768
769    pub fn register_engine(&self, engine: RegionEngineRef) {
770        let engine_name = engine.name();
771        info!("Region Engine {engine_name} is registered");
772        self.engines
773            .write()
774            .unwrap()
775            .insert(engine_name.to_string(), engine);
776    }
777
778    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
779        info!("Set topic stats reporter");
780        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
781    }
782
783    fn get_engine(
784        &self,
785        region_id: RegionId,
786        region_change: &RegionChange,
787    ) -> Result<CurrentEngine> {
788        let current_region_status = self.region_map.get(&region_id);
789
790        let engine = match region_change {
791            RegionChange::Register(attribute) => match current_region_status {
792                Some(status) => match status.clone() {
793                    RegionEngineWithStatus::Registering(engine) => engine,
794                    RegionEngineWithStatus::Deregistering(_) => {
795                        return error::RegionBusySnafu { region_id }.fail()
796                    }
797                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
798                },
799                _ => self
800                    .engines
801                    .read()
802                    .unwrap()
803                    .get(attribute.engine())
804                    .with_context(|| RegionEngineNotFoundSnafu {
805                        name: attribute.engine(),
806                    })?
807                    .clone(),
808            },
809            RegionChange::Deregisters => match current_region_status {
810                Some(status) => match status.clone() {
811                    RegionEngineWithStatus::Registering(_) => {
812                        return error::RegionBusySnafu { region_id }.fail()
813                    }
814                    RegionEngineWithStatus::Deregistering(_) => {
815                        return Ok(CurrentEngine::EarlyReturn(0))
816                    }
817                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
818                },
819                None => return Ok(CurrentEngine::EarlyReturn(0)),
820            },
821            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
822                match current_region_status {
823                    Some(status) => match status.clone() {
824                        RegionEngineWithStatus::Registering(_) => {
825                            return error::RegionNotReadySnafu { region_id }.fail()
826                        }
827                        RegionEngineWithStatus::Deregistering(_) => {
828                            return error::RegionNotFoundSnafu { region_id }.fail()
829                        }
830                        RegionEngineWithStatus::Ready(engine) => engine,
831                    },
832                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
833                }
834            }
835        };
836
837        Ok(CurrentEngine::Engine(engine))
838    }
839
840    async fn handle_batch_open_requests_inner(
841        &self,
842        engine: RegionEngineRef,
843        parallelism: usize,
844        requests: Vec<(RegionId, RegionOpenRequest)>,
845        ignore_nonexistent_region: bool,
846    ) -> Result<Vec<RegionId>> {
847        let region_changes = requests
848            .iter()
849            .map(|(region_id, open)| {
850                let attribute = parse_region_attribute(&open.engine, &open.options)?;
851                Ok((*region_id, RegionChange::Register(attribute)))
852            })
853            .collect::<Result<HashMap<_, _>>>()?;
854
855        for (&region_id, region_change) in &region_changes {
856            self.set_region_status_not_ready(region_id, &engine, region_change)
857        }
858
859        let mut open_regions = Vec::with_capacity(requests.len());
860        let mut errors = vec![];
861        match engine
862            .handle_batch_open_requests(parallelism, requests)
863            .await
864            .with_context(|_| HandleBatchOpenRequestSnafu)
865        {
866            Ok(results) => {
867                for (region_id, result) in results {
868                    let region_change = &region_changes[&region_id];
869                    match result {
870                        Ok(_) => {
871                            if let Err(e) = self
872                                .set_region_status_ready(region_id, engine.clone(), *region_change)
873                                .await
874                            {
875                                error!(e; "Failed to set region to ready: {}", region_id);
876                                errors.push(BoxedError::new(e));
877                            } else {
878                                open_regions.push(region_id)
879                            }
880                        }
881                        Err(e) => {
882                            self.unset_region_status(region_id, &engine, *region_change);
883                            if e.status_code() == StatusCode::RegionNotFound
884                                && ignore_nonexistent_region
885                            {
886                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
887                            } else {
888                                error!(e; "Failed to open region: {}", region_id);
889                                errors.push(e);
890                            }
891                        }
892                    }
893                }
894            }
895            Err(e) => {
896                for (&region_id, region_change) in &region_changes {
897                    self.unset_region_status(region_id, &engine, *region_change);
898                }
899                error!(e; "Failed to open batch regions");
900                errors.push(BoxedError::new(e));
901            }
902        }
903
904        if !errors.is_empty() {
905            return error::UnexpectedSnafu {
906                // Returns the first error.
907                violated: format!("Failed to open batch regions: {:?}", errors[0]),
908            }
909            .fail();
910        }
911
912        Ok(open_regions)
913    }
914
915    pub async fn handle_batch_open_requests(
916        &self,
917        parallelism: usize,
918        requests: Vec<(RegionId, RegionOpenRequest)>,
919        ignore_nonexistent_region: bool,
920    ) -> Result<Vec<RegionId>> {
921        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
922            HashMap::with_capacity(requests.len());
923        for (region_id, request) in requests {
924            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
925                requests.push((region_id, request));
926            } else {
927                engine_grouped_requests
928                    .insert(request.engine.to_string(), vec![(region_id, request)]);
929            }
930        }
931
932        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
933        for (engine, requests) in engine_grouped_requests {
934            let engine = self
935                .engines
936                .read()
937                .unwrap()
938                .get(&engine)
939                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
940                .clone();
941            results.push(
942                self.handle_batch_open_requests_inner(
943                    engine,
944                    parallelism,
945                    requests,
946                    ignore_nonexistent_region,
947                )
948                .await,
949            )
950        }
951
952        Ok(results
953            .into_iter()
954            .collect::<Result<Vec<_>>>()?
955            .into_iter()
956            .flatten()
957            .collect::<Vec<_>>())
958    }
959
960    // Handle requests in batch.
961    //
962    // limitation: all create requests must be in the same engine.
963    pub async fn handle_batch_request(
964        &self,
965        batch_request: BatchRegionDdlRequest,
966    ) -> Result<RegionResponse> {
967        let region_changes = match &batch_request {
968            BatchRegionDdlRequest::Create(requests) => requests
969                .iter()
970                .map(|(region_id, create)| {
971                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
972                    Ok((*region_id, RegionChange::Register(attribute)))
973                })
974                .collect::<Result<Vec<_>>>()?,
975            BatchRegionDdlRequest::Drop(requests) => requests
976                .iter()
977                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
978                .collect::<Vec<_>>(),
979            BatchRegionDdlRequest::Alter(requests) => requests
980                .iter()
981                .map(|(region_id, _)| (*region_id, RegionChange::None))
982                .collect::<Vec<_>>(),
983        };
984
985        // The ddl procedure will ensure all requests are in the same engine.
986        // Therefore, we can get the engine from the first request.
987        let (first_region_id, first_region_change) = region_changes.first().unwrap();
988        let engine = match self.get_engine(*first_region_id, first_region_change)? {
989            CurrentEngine::Engine(engine) => engine,
990            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
991        };
992
993        for (region_id, region_change) in region_changes.iter() {
994            self.set_region_status_not_ready(*region_id, &engine, region_change);
995        }
996
997        let ddl_type = batch_request.request_type();
998        let result = engine
999            .handle_batch_ddl_requests(batch_request)
1000            .await
1001            .context(HandleBatchDdlRequestSnafu { ddl_type });
1002
1003        match result {
1004            Ok(result) => {
1005                for (region_id, region_change) in &region_changes {
1006                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1007                        .await?;
1008                }
1009
1010                Ok(RegionResponse {
1011                    affected_rows: result.affected_rows,
1012                    extensions: result.extensions,
1013                    metadata: Vec::new(),
1014                })
1015            }
1016            Err(err) => {
1017                for (region_id, region_change) in region_changes {
1018                    self.unset_region_status(region_id, &engine, region_change);
1019                }
1020
1021                Err(err)
1022            }
1023        }
1024    }
1025
1026    pub async fn handle_request(
1027        &self,
1028        region_id: RegionId,
1029        request: RegionRequest,
1030    ) -> Result<RegionResponse> {
1031        let request_type = request.request_type();
1032        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1033            .with_label_values(&[request_type])
1034            .start_timer();
1035
1036        let region_change = match &request {
1037            RegionRequest::Create(create) => {
1038                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1039                RegionChange::Register(attribute)
1040            }
1041            RegionRequest::Open(open) => {
1042                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1043                RegionChange::Register(attribute)
1044            }
1045            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1046            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1047                RegionChange::Ingest
1048            }
1049            RegionRequest::Alter(_)
1050            | RegionRequest::Flush(_)
1051            | RegionRequest::Compact(_)
1052            | RegionRequest::Truncate(_) => RegionChange::None,
1053            RegionRequest::Catchup(_) => RegionChange::Catchup,
1054        };
1055
1056        let engine = match self.get_engine(region_id, &region_change)? {
1057            CurrentEngine::Engine(engine) => engine,
1058            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1059        };
1060
1061        // Sets corresponding region status to registering/deregistering before the operation.
1062        self.set_region_status_not_ready(region_id, &engine, &region_change);
1063
1064        match engine
1065            .handle_request(region_id, request)
1066            .await
1067            .with_context(|_| HandleRegionRequestSnafu { region_id })
1068        {
1069            Ok(result) => {
1070                // Update metrics
1071                if matches!(region_change, RegionChange::Ingest) {
1072                    crate::metrics::REGION_CHANGED_ROW_COUNT
1073                        .with_label_values(&[request_type])
1074                        .inc_by(result.affected_rows as u64);
1075                }
1076                // Sets corresponding region status to ready.
1077                self.set_region_status_ready(region_id, engine.clone(), region_change)
1078                    .await?;
1079
1080                Ok(RegionResponse {
1081                    affected_rows: result.affected_rows,
1082                    extensions: result.extensions,
1083                    metadata: Vec::new(),
1084                })
1085            }
1086            Err(err) => {
1087                // Removes the region status if the operation fails.
1088                self.unset_region_status(region_id, &engine, region_change);
1089                Err(err)
1090            }
1091        }
1092    }
1093
1094    /// Handles the sync region request.
1095    pub async fn handle_sync_region(
1096        &self,
1097        engine: &RegionEngineRef,
1098        region_id: RegionId,
1099        manifest_info: RegionManifestInfo,
1100    ) -> Result<()> {
1101        let Some(new_opened_regions) = engine
1102            .sync_region(region_id, manifest_info)
1103            .await
1104            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1105            .new_opened_logical_region_ids()
1106        else {
1107            warn!("No new opened logical regions");
1108            return Ok(());
1109        };
1110
1111        for region in new_opened_regions {
1112            self.region_map
1113                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1114            info!("Logical region {} is registered!", region);
1115        }
1116
1117        Ok(())
1118    }
1119
1120    fn set_region_status_not_ready(
1121        &self,
1122        region_id: RegionId,
1123        engine: &RegionEngineRef,
1124        region_change: &RegionChange,
1125    ) {
1126        match region_change {
1127            RegionChange::Register(_) => {
1128                self.region_map.insert(
1129                    region_id,
1130                    RegionEngineWithStatus::Registering(engine.clone()),
1131                );
1132            }
1133            RegionChange::Deregisters => {
1134                self.region_map.insert(
1135                    region_id,
1136                    RegionEngineWithStatus::Deregistering(engine.clone()),
1137                );
1138            }
1139            _ => {}
1140        }
1141    }
1142
1143    fn unset_region_status(
1144        &self,
1145        region_id: RegionId,
1146        engine: &RegionEngineRef,
1147        region_change: RegionChange,
1148    ) {
1149        match region_change {
1150            RegionChange::None | RegionChange::Ingest => {}
1151            RegionChange::Register(_) => {
1152                self.region_map.remove(&region_id);
1153            }
1154            RegionChange::Deregisters => {
1155                self.region_map
1156                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1157            }
1158            RegionChange::Catchup => {}
1159        }
1160    }
1161
1162    async fn set_region_status_ready(
1163        &self,
1164        region_id: RegionId,
1165        engine: RegionEngineRef,
1166        region_change: RegionChange,
1167    ) -> Result<()> {
1168        let engine_type = engine.name();
1169        match region_change {
1170            RegionChange::None | RegionChange::Ingest => {}
1171            RegionChange::Register(attribute) => {
1172                info!(
1173                    "Region {region_id} is registered to engine {}",
1174                    attribute.engine()
1175                );
1176                self.region_map
1177                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1178
1179                match attribute {
1180                    RegionAttribute::Metric { physical } => {
1181                        if physical {
1182                            // Registers the logical regions belong to the physical region (`region_id`).
1183                            self.register_logical_regions(&engine, region_id).await?;
1184                            // We only send the `on_region_registered` event of the physical region.
1185                            self.event_listener.on_region_registered(region_id);
1186                        }
1187                    }
1188                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1189                    RegionAttribute::File => {
1190                        // do nothing
1191                    }
1192                }
1193            }
1194            RegionChange::Deregisters => {
1195                info!("Region {region_id} is deregistered from engine {engine_type}");
1196                self.region_map
1197                    .remove(&region_id)
1198                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1199                self.event_listener.on_region_deregistered(region_id);
1200            }
1201            RegionChange::Catchup => {
1202                if is_metric_engine(engine.name()) {
1203                    // Registers the logical regions belong to the physical region (`region_id`).
1204                    self.register_logical_regions(&engine, region_id).await?;
1205                }
1206            }
1207        }
1208        Ok(())
1209    }
1210
1211    async fn register_logical_regions(
1212        &self,
1213        engine: &RegionEngineRef,
1214        physical_region_id: RegionId,
1215    ) -> Result<()> {
1216        let metric_engine =
1217            engine
1218                .as_any()
1219                .downcast_ref::<MetricEngine>()
1220                .context(UnexpectedSnafu {
1221                    violated: format!(
1222                        "expecting engine type '{}', actual '{}'",
1223                        METRIC_ENGINE_NAME,
1224                        engine.name(),
1225                    ),
1226                })?;
1227
1228        let logical_regions = metric_engine
1229            .logical_regions(physical_region_id)
1230            .await
1231            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1232
1233        for region in logical_regions {
1234            self.region_map
1235                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1236            info!("Logical region {} is registered!", region);
1237        }
1238        Ok(())
1239    }
1240
1241    pub async fn handle_read(
1242        &self,
1243        request: QueryRequest,
1244        query_ctx: QueryContextRef,
1245    ) -> Result<SendableRecordBatchStream> {
1246        // TODO(ruihang): add metrics and set trace id
1247
1248        let result = self
1249            .query_engine
1250            .execute(request.plan, query_ctx)
1251            .await
1252            .context(ExecuteLogicalPlanSnafu)?;
1253
1254        match result.data {
1255            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1256                UnsupportedOutputSnafu { expected: "stream" }.fail()
1257            }
1258            OutputData::Stream(stream) => Ok(stream),
1259        }
1260    }
1261
1262    async fn stop(&self) -> Result<()> {
1263        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1264        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1265        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1266        //
1267        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1268        // it here, collect the values first then use later separately.
1269
1270        let regions = self
1271            .region_map
1272            .iter()
1273            .map(|x| (*x.key(), x.value().clone()))
1274            .collect::<Vec<_>>();
1275        let num_regions = regions.len();
1276
1277        for (region_id, engine) in regions {
1278            let closed = engine
1279                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1280                .await;
1281            match closed {
1282                Ok(_) => debug!("Region {region_id} is closed"),
1283                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1284            }
1285        }
1286        self.region_map.clear();
1287        info!("closed {num_regions} regions");
1288
1289        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1290        for (engine_name, engine) in engines {
1291            engine
1292                .stop()
1293                .await
1294                .context(StopRegionEngineSnafu { name: &engine_name })?;
1295            info!("Region engine {engine_name} is stopped");
1296        }
1297
1298        Ok(())
1299    }
1300}
1301
1302#[derive(Debug, Clone, Copy)]
1303enum RegionChange {
1304    None,
1305    Register(RegionAttribute),
1306    Deregisters,
1307    Catchup,
1308    Ingest,
1309}
1310
1311fn is_metric_engine(engine: &str) -> bool {
1312    engine == METRIC_ENGINE_NAME
1313}
1314
1315fn parse_region_attribute(
1316    engine: &str,
1317    options: &HashMap<String, String>,
1318) -> Result<RegionAttribute> {
1319    match engine {
1320        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1321        METRIC_ENGINE_NAME => {
1322            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1323
1324            Ok(RegionAttribute::Metric { physical })
1325        }
1326        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1327        _ => error::UnexpectedSnafu {
1328            violated: format!("Unknown engine: {}", engine),
1329        }
1330        .fail(),
1331    }
1332}
1333
1334#[derive(Debug, Clone, Copy)]
1335enum RegionAttribute {
1336    Mito,
1337    Metric { physical: bool },
1338    File,
1339}
1340
1341impl RegionAttribute {
1342    fn engine(&self) -> &'static str {
1343        match self {
1344            RegionAttribute::Mito => MITO_ENGINE_NAME,
1345            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1346            RegionAttribute::File => FILE_ENGINE_NAME,
1347        }
1348    }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353
1354    use std::assert_matches::assert_matches;
1355
1356    use api::v1::SemanticType;
1357    use common_error::ext::ErrorExt;
1358    use datatypes::prelude::ConcreteDataType;
1359    use mito2::test_util::CreateRequestBuilder;
1360    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1361    use store_api::region_engine::RegionEngine;
1362    use store_api::region_request::{
1363        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1364    };
1365    use store_api::storage::RegionId;
1366
1367    use super::*;
1368    use crate::error::Result;
1369    use crate::tests::{mock_region_server, MockRegionEngine};
1370
1371    #[tokio::test]
1372    async fn test_region_registering() {
1373        common_telemetry::init_default_ut_logging();
1374
1375        let mut mock_region_server = mock_region_server();
1376        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1377        let engine_name = engine.name();
1378        mock_region_server.register_engine(engine.clone());
1379        let region_id = RegionId::new(1, 1);
1380        let builder = CreateRequestBuilder::new();
1381        let create_req = builder.build();
1382        // Tries to create/open a registering region.
1383        mock_region_server.inner.region_map.insert(
1384            region_id,
1385            RegionEngineWithStatus::Registering(engine.clone()),
1386        );
1387        let response = mock_region_server
1388            .handle_request(region_id, RegionRequest::Create(create_req))
1389            .await
1390            .unwrap();
1391        assert_eq!(response.affected_rows, 0);
1392        let status = mock_region_server
1393            .inner
1394            .region_map
1395            .get(&region_id)
1396            .unwrap()
1397            .clone();
1398        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1399
1400        mock_region_server.inner.region_map.insert(
1401            region_id,
1402            RegionEngineWithStatus::Registering(engine.clone()),
1403        );
1404        let response = mock_region_server
1405            .handle_request(
1406                region_id,
1407                RegionRequest::Open(RegionOpenRequest {
1408                    engine: engine_name.to_string(),
1409                    table_dir: String::new(),
1410                    path_type: PathType::Bare,
1411                    options: Default::default(),
1412                    skip_wal_replay: false,
1413                    checkpoint: None,
1414                }),
1415            )
1416            .await
1417            .unwrap();
1418        assert_eq!(response.affected_rows, 0);
1419        let status = mock_region_server
1420            .inner
1421            .region_map
1422            .get(&region_id)
1423            .unwrap()
1424            .clone();
1425        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1426    }
1427
1428    #[tokio::test]
1429    async fn test_region_deregistering() {
1430        common_telemetry::init_default_ut_logging();
1431
1432        let mut mock_region_server = mock_region_server();
1433        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1434
1435        mock_region_server.register_engine(engine.clone());
1436
1437        let region_id = RegionId::new(1, 1);
1438
1439        // Tries to drop/close a registering region.
1440        mock_region_server.inner.region_map.insert(
1441            region_id,
1442            RegionEngineWithStatus::Deregistering(engine.clone()),
1443        );
1444
1445        let response = mock_region_server
1446            .handle_request(
1447                region_id,
1448                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1449            )
1450            .await
1451            .unwrap();
1452        assert_eq!(response.affected_rows, 0);
1453
1454        let status = mock_region_server
1455            .inner
1456            .region_map
1457            .get(&region_id)
1458            .unwrap()
1459            .clone();
1460        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1461
1462        mock_region_server.inner.region_map.insert(
1463            region_id,
1464            RegionEngineWithStatus::Deregistering(engine.clone()),
1465        );
1466
1467        let response = mock_region_server
1468            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1469            .await
1470            .unwrap();
1471        assert_eq!(response.affected_rows, 0);
1472
1473        let status = mock_region_server
1474            .inner
1475            .region_map
1476            .get(&region_id)
1477            .unwrap()
1478            .clone();
1479        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1480    }
1481
1482    #[tokio::test]
1483    async fn test_region_not_ready() {
1484        common_telemetry::init_default_ut_logging();
1485
1486        let mut mock_region_server = mock_region_server();
1487        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1488
1489        mock_region_server.register_engine(engine.clone());
1490
1491        let region_id = RegionId::new(1, 1);
1492
1493        // Tries to drop/close a registering region.
1494        mock_region_server.inner.region_map.insert(
1495            region_id,
1496            RegionEngineWithStatus::Registering(engine.clone()),
1497        );
1498
1499        let err = mock_region_server
1500            .handle_request(
1501                region_id,
1502                RegionRequest::Truncate(RegionTruncateRequest::All),
1503            )
1504            .await
1505            .unwrap_err();
1506
1507        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1508    }
1509
1510    #[tokio::test]
1511    async fn test_region_request_failed() {
1512        common_telemetry::init_default_ut_logging();
1513
1514        let mut mock_region_server = mock_region_server();
1515        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1516            MITO_ENGINE_NAME,
1517            Box::new(|_region_id, _request| {
1518                error::UnexpectedSnafu {
1519                    violated: "test".to_string(),
1520                }
1521                .fail()
1522            }),
1523        );
1524
1525        mock_region_server.register_engine(engine.clone());
1526
1527        let region_id = RegionId::new(1, 1);
1528        let builder = CreateRequestBuilder::new();
1529        let create_req = builder.build();
1530        mock_region_server
1531            .handle_request(region_id, RegionRequest::Create(create_req))
1532            .await
1533            .unwrap_err();
1534
1535        let status = mock_region_server.inner.region_map.get(&region_id);
1536        assert!(status.is_none());
1537
1538        mock_region_server
1539            .inner
1540            .region_map
1541            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1542
1543        mock_region_server
1544            .handle_request(
1545                region_id,
1546                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1547            )
1548            .await
1549            .unwrap_err();
1550
1551        let status = mock_region_server.inner.region_map.get(&region_id);
1552        assert!(status.is_some());
1553    }
1554
1555    #[tokio::test]
1556    async fn test_batch_open_region_ignore_nonexistent_regions() {
1557        common_telemetry::init_default_ut_logging();
1558        let mut mock_region_server = mock_region_server();
1559        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1560            MITO_ENGINE_NAME,
1561            Box::new(|region_id, _request| {
1562                if region_id == RegionId::new(1, 1) {
1563                    error::RegionNotFoundSnafu { region_id }.fail()
1564                } else {
1565                    Ok(0)
1566                }
1567            }),
1568        );
1569        mock_region_server.register_engine(engine.clone());
1570
1571        let region_ids = mock_region_server
1572            .handle_batch_open_requests(
1573                8,
1574                vec![
1575                    (
1576                        RegionId::new(1, 1),
1577                        RegionOpenRequest {
1578                            engine: MITO_ENGINE_NAME.to_string(),
1579                            table_dir: String::new(),
1580                            path_type: PathType::Bare,
1581                            options: Default::default(),
1582                            skip_wal_replay: false,
1583                            checkpoint: None,
1584                        },
1585                    ),
1586                    (
1587                        RegionId::new(1, 2),
1588                        RegionOpenRequest {
1589                            engine: MITO_ENGINE_NAME.to_string(),
1590                            table_dir: String::new(),
1591                            path_type: PathType::Bare,
1592                            options: Default::default(),
1593                            skip_wal_replay: false,
1594                            checkpoint: None,
1595                        },
1596                    ),
1597                ],
1598                true,
1599            )
1600            .await
1601            .unwrap();
1602        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1603
1604        let err = mock_region_server
1605            .handle_batch_open_requests(
1606                8,
1607                vec![
1608                    (
1609                        RegionId::new(1, 1),
1610                        RegionOpenRequest {
1611                            engine: MITO_ENGINE_NAME.to_string(),
1612                            table_dir: String::new(),
1613                            path_type: PathType::Bare,
1614                            options: Default::default(),
1615                            skip_wal_replay: false,
1616                            checkpoint: None,
1617                        },
1618                    ),
1619                    (
1620                        RegionId::new(1, 2),
1621                        RegionOpenRequest {
1622                            engine: MITO_ENGINE_NAME.to_string(),
1623                            table_dir: String::new(),
1624                            path_type: PathType::Bare,
1625                            options: Default::default(),
1626                            skip_wal_replay: false,
1627                            checkpoint: None,
1628                        },
1629                    ),
1630                ],
1631                false,
1632            )
1633            .await
1634            .unwrap_err();
1635        assert_eq!(err.status_code(), StatusCode::Unexpected);
1636    }
1637
1638    struct CurrentEngineTest {
1639        region_id: RegionId,
1640        current_region_status: Option<RegionEngineWithStatus>,
1641        region_change: RegionChange,
1642        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1643    }
1644
1645    #[tokio::test]
1646    async fn test_current_engine() {
1647        common_telemetry::init_default_ut_logging();
1648
1649        let mut mock_region_server = mock_region_server();
1650        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1651        mock_region_server.register_engine(engine.clone());
1652
1653        let region_id = RegionId::new(1024, 1);
1654        let tests = vec![
1655            // RegionChange::None
1656            CurrentEngineTest {
1657                region_id,
1658                current_region_status: None,
1659                region_change: RegionChange::None,
1660                assert: Box::new(|result| {
1661                    let err = result.unwrap_err();
1662                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1663                }),
1664            },
1665            CurrentEngineTest {
1666                region_id,
1667                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1668                region_change: RegionChange::None,
1669                assert: Box::new(|result| {
1670                    let current_engine = result.unwrap();
1671                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1672                }),
1673            },
1674            CurrentEngineTest {
1675                region_id,
1676                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1677                region_change: RegionChange::None,
1678                assert: Box::new(|result| {
1679                    let err = result.unwrap_err();
1680                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1681                }),
1682            },
1683            CurrentEngineTest {
1684                region_id,
1685                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1686                region_change: RegionChange::None,
1687                assert: Box::new(|result| {
1688                    let err = result.unwrap_err();
1689                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1690                }),
1691            },
1692            // RegionChange::Register
1693            CurrentEngineTest {
1694                region_id,
1695                current_region_status: None,
1696                region_change: RegionChange::Register(RegionAttribute::Mito),
1697                assert: Box::new(|result| {
1698                    let current_engine = result.unwrap();
1699                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1700                }),
1701            },
1702            CurrentEngineTest {
1703                region_id,
1704                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1705                region_change: RegionChange::Register(RegionAttribute::Mito),
1706                assert: Box::new(|result| {
1707                    let current_engine = result.unwrap();
1708                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1709                }),
1710            },
1711            CurrentEngineTest {
1712                region_id,
1713                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1714                region_change: RegionChange::Register(RegionAttribute::Mito),
1715                assert: Box::new(|result| {
1716                    let err = result.unwrap_err();
1717                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1718                }),
1719            },
1720            CurrentEngineTest {
1721                region_id,
1722                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1723                region_change: RegionChange::Register(RegionAttribute::Mito),
1724                assert: Box::new(|result| {
1725                    let current_engine = result.unwrap();
1726                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1727                }),
1728            },
1729            // RegionChange::Deregister
1730            CurrentEngineTest {
1731                region_id,
1732                current_region_status: None,
1733                region_change: RegionChange::Deregisters,
1734                assert: Box::new(|result| {
1735                    let current_engine = result.unwrap();
1736                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1737                }),
1738            },
1739            CurrentEngineTest {
1740                region_id,
1741                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1742                region_change: RegionChange::Deregisters,
1743                assert: Box::new(|result| {
1744                    let err = result.unwrap_err();
1745                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1746                }),
1747            },
1748            CurrentEngineTest {
1749                region_id,
1750                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1751                region_change: RegionChange::Deregisters,
1752                assert: Box::new(|result| {
1753                    let current_engine = result.unwrap();
1754                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1755                }),
1756            },
1757            CurrentEngineTest {
1758                region_id,
1759                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1760                region_change: RegionChange::Deregisters,
1761                assert: Box::new(|result| {
1762                    let current_engine = result.unwrap();
1763                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1764                }),
1765            },
1766        ];
1767
1768        for test in tests {
1769            let CurrentEngineTest {
1770                region_id,
1771                current_region_status,
1772                region_change,
1773                assert,
1774            } = test;
1775
1776            // Sets up
1777            if let Some(status) = current_region_status {
1778                mock_region_server
1779                    .inner
1780                    .region_map
1781                    .insert(region_id, status);
1782            } else {
1783                mock_region_server.inner.region_map.remove(&region_id);
1784            }
1785
1786            let result = mock_region_server
1787                .inner
1788                .get_engine(region_id, &region_change);
1789
1790            assert(result);
1791        }
1792    }
1793
1794    #[tokio::test]
1795    async fn test_region_server_parallelism() {
1796        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1797        let first_query = p.acquire().await;
1798        assert!(first_query.is_ok());
1799        let second_query = p.acquire().await;
1800        assert!(second_query.is_ok());
1801        let third_query = p.acquire().await;
1802        assert!(third_query.is_err());
1803        let err = third_query.unwrap_err();
1804        assert_eq!(
1805            err.output_msg(),
1806            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1807        );
1808        drop(first_query);
1809        let forth_query = p.acquire().await;
1810        assert!(forth_query.is_ok());
1811    }
1812
1813    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1814        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1815        metadata_builder.push_column_metadata(ColumnMetadata {
1816            column_schema: datatypes::schema::ColumnSchema::new(
1817                "timestamp",
1818                ConcreteDataType::timestamp_nanosecond_datatype(),
1819                false,
1820            ),
1821            semantic_type: SemanticType::Timestamp,
1822            column_id: 0,
1823        });
1824        metadata_builder.push_column_metadata(ColumnMetadata {
1825            column_schema: datatypes::schema::ColumnSchema::new(
1826                "file",
1827                ConcreteDataType::string_datatype(),
1828                true,
1829            ),
1830            semantic_type: SemanticType::Tag,
1831            column_id: 1,
1832        });
1833        metadata_builder.push_column_metadata(ColumnMetadata {
1834            column_schema: datatypes::schema::ColumnSchema::new(
1835                "message",
1836                ConcreteDataType::string_datatype(),
1837                true,
1838            ),
1839            semantic_type: SemanticType::Field,
1840            column_id: 2,
1841        });
1842        metadata_builder.primary_key(vec![1]);
1843        metadata_builder.build().unwrap()
1844    }
1845
1846    #[tokio::test]
1847    async fn test_handle_list_metadata_request() {
1848        common_telemetry::init_default_ut_logging();
1849
1850        let mut mock_region_server = mock_region_server();
1851        let region_id_1 = RegionId::new(1, 0);
1852        let region_id_2 = RegionId::new(2, 0);
1853
1854        let metadata_1 = mock_region_metadata(region_id_1);
1855        let metadata_2 = mock_region_metadata(region_id_2);
1856        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
1857
1858        let metadata_1 = Arc::new(metadata_1);
1859        let metadata_2 = Arc::new(metadata_2);
1860        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1861            MITO_ENGINE_NAME,
1862            Box::new(move |region_id| {
1863                if region_id == region_id_1 {
1864                    Ok(metadata_1.clone())
1865                } else if region_id == region_id_2 {
1866                    Ok(metadata_2.clone())
1867                } else {
1868                    error::RegionNotFoundSnafu { region_id }.fail()
1869                }
1870            }),
1871        );
1872
1873        mock_region_server.register_engine(engine.clone());
1874        mock_region_server
1875            .inner
1876            .region_map
1877            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1878        mock_region_server
1879            .inner
1880            .region_map
1881            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1882
1883        // All regions exist.
1884        let list_metadata_request = ListMetadataRequest {
1885            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1886        };
1887        let response = mock_region_server
1888            .handle_list_metadata_request(&list_metadata_request)
1889            .await
1890            .unwrap();
1891        let decoded_metadata: Vec<Option<RegionMetadata>> =
1892            serde_json::from_slice(&response.metadata).unwrap();
1893        assert_eq!(metadatas, decoded_metadata);
1894    }
1895
1896    #[tokio::test]
1897    async fn test_handle_list_metadata_not_found() {
1898        common_telemetry::init_default_ut_logging();
1899
1900        let mut mock_region_server = mock_region_server();
1901        let region_id_1 = RegionId::new(1, 0);
1902        let region_id_2 = RegionId::new(2, 0);
1903
1904        let metadata_1 = mock_region_metadata(region_id_1);
1905        let metadatas = vec![Some(metadata_1.clone()), None];
1906
1907        let metadata_1 = Arc::new(metadata_1);
1908        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1909            MITO_ENGINE_NAME,
1910            Box::new(move |region_id| {
1911                if region_id == region_id_1 {
1912                    Ok(metadata_1.clone())
1913                } else {
1914                    error::RegionNotFoundSnafu { region_id }.fail()
1915                }
1916            }),
1917        );
1918
1919        mock_region_server.register_engine(engine.clone());
1920        mock_region_server
1921            .inner
1922            .region_map
1923            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1924
1925        // Not in region map.
1926        let list_metadata_request = ListMetadataRequest {
1927            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1928        };
1929        let response = mock_region_server
1930            .handle_list_metadata_request(&list_metadata_request)
1931            .await
1932            .unwrap();
1933        let decoded_metadata: Vec<Option<RegionMetadata>> =
1934            serde_json::from_slice(&response.metadata).unwrap();
1935        assert_eq!(metadatas, decoded_metadata);
1936
1937        // Not in region engine.
1938        mock_region_server
1939            .inner
1940            .region_map
1941            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1942        let response = mock_region_server
1943            .handle_list_metadata_request(&list_metadata_request)
1944            .await
1945            .unwrap();
1946        let decoded_metadata: Vec<Option<RegionMetadata>> =
1947            serde_json::from_slice(&response.metadata).unwrap();
1948        assert_eq!(metadatas, decoded_metadata);
1949    }
1950
1951    #[tokio::test]
1952    async fn test_handle_list_metadata_failed() {
1953        common_telemetry::init_default_ut_logging();
1954
1955        let mut mock_region_server = mock_region_server();
1956        let region_id_1 = RegionId::new(1, 0);
1957
1958        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1959            MITO_ENGINE_NAME,
1960            Box::new(move |region_id| {
1961                error::UnexpectedSnafu {
1962                    violated: format!("Failed to get region {region_id}"),
1963                }
1964                .fail()
1965            }),
1966        );
1967
1968        mock_region_server.register_engine(engine.clone());
1969        mock_region_server
1970            .inner
1971            .region_map
1972            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1973
1974        // Failed to get.
1975        let list_metadata_request = ListMetadataRequest {
1976            region_ids: vec![region_id_1.as_u64()],
1977        };
1978        mock_region_server
1979            .handle_list_metadata_request(&list_metadata_request)
1980            .await
1981            .unwrap_err();
1982    }
1983}