datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::{Arc, RwLock};
21use std::time::Duration;
22
23use api::region::RegionResponse;
24use api::v1::meta::TopicStat;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
28};
29use api::v1::{ResponseHeader, Status};
30use arrow_flight::{FlightData, Ticket};
31use async_trait::async_trait;
32use bytes::Bytes;
33use common_error::ext::{BoxedError, ErrorExt};
34use common_error::status_code::StatusCode;
35use common_meta::datanode::TopicStatsReporter;
36use common_query::OutputData;
37use common_query::request::QueryRequest;
38use common_recordbatch::SendableRecordBatchStream;
39use common_runtime::Runtime;
40use common_telemetry::tracing::{self, info_span};
41use common_telemetry::tracing_context::{FutureExt, TracingContext};
42use common_telemetry::{debug, error, info, warn};
43use dashmap::DashMap;
44use datafusion::datasource::TableProvider;
45use datafusion_common::tree_node::TreeNode;
46use futures_util::future::try_join_all;
47use metric_engine::engine::MetricEngine;
48use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
49use prost::Message;
50use query::QueryEngineRef;
51pub use query::dummy_catalog::{
52    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
53};
54use serde_json;
55use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
56use servers::grpc::FlightCompression;
57use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
58use servers::grpc::region_server::RegionServerHandler;
59use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
60use snafu::{OptionExt, ResultExt, ensure};
61use store_api::metric_engine_consts::{
62    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
63};
64use store_api::region_engine::{
65    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
66    SettableRegionRoleState,
67};
68use store_api::region_request::{
69    AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
70};
71use store_api::storage::RegionId;
72use tokio::sync::{Semaphore, SemaphorePermit};
73use tokio::time::timeout;
74use tonic::{Request, Response, Result as TonicResult};
75
76use crate::error::{
77    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
78    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
79    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
80    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
81    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
82    Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
83};
84use crate::event_listener::RegionServerEventListenerRef;
85use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
86
87#[derive(Clone)]
88pub struct RegionServer {
89    inner: Arc<RegionServerInner>,
90    flight_compression: FlightCompression,
91}
92
93pub struct RegionStat {
94    pub region_id: RegionId,
95    pub engine: String,
96    pub role: RegionRole,
97}
98
99impl RegionServer {
100    pub fn new(
101        query_engine: QueryEngineRef,
102        runtime: Runtime,
103        event_listener: RegionServerEventListenerRef,
104        flight_compression: FlightCompression,
105    ) -> Self {
106        Self::with_table_provider(
107            query_engine,
108            runtime,
109            event_listener,
110            Arc::new(DummyTableProviderFactory),
111            0,
112            Duration::from_millis(0),
113            flight_compression,
114        )
115    }
116
117    pub fn with_table_provider(
118        query_engine: QueryEngineRef,
119        runtime: Runtime,
120        event_listener: RegionServerEventListenerRef,
121        table_provider_factory: TableProviderFactoryRef,
122        max_concurrent_queries: usize,
123        concurrent_query_limiter_timeout: Duration,
124        flight_compression: FlightCompression,
125    ) -> Self {
126        Self {
127            inner: Arc::new(RegionServerInner::new(
128                query_engine,
129                runtime,
130                event_listener,
131                table_provider_factory,
132                RegionServerParallelism::from_opts(
133                    max_concurrent_queries,
134                    concurrent_query_limiter_timeout,
135                ),
136            )),
137            flight_compression,
138        }
139    }
140
141    /// Registers an engine.
142    pub fn register_engine(&mut self, engine: RegionEngineRef) {
143        self.inner.register_engine(engine);
144    }
145
146    /// Sets the topic stats.
147    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
148        self.inner.set_topic_stats_reporter(topic_stats_reporter);
149    }
150
151    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
152    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
153        match self.inner.get_engine(region_id, &RegionChange::None) {
154            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
155            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
156            Err(error::Error::RegionNotFound { .. }) => Ok(None),
157            Err(err) => Err(err),
158        }
159    }
160
161    #[tracing::instrument(skip_all)]
162    pub async fn handle_batch_open_requests(
163        &self,
164        parallelism: usize,
165        requests: Vec<(RegionId, RegionOpenRequest)>,
166        ignore_nonexistent_region: bool,
167    ) -> Result<Vec<RegionId>> {
168        self.inner
169            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
170            .await
171    }
172
173    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
174    pub async fn handle_request(
175        &self,
176        region_id: RegionId,
177        request: RegionRequest,
178    ) -> Result<RegionResponse> {
179        self.inner.handle_request(region_id, request).await
180    }
181
182    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
183    async fn table_provider(
184        &self,
185        region_id: RegionId,
186        ctx: Option<QueryContextRef>,
187    ) -> Result<Arc<dyn TableProvider>> {
188        let status = self
189            .inner
190            .region_map
191            .get(&region_id)
192            .context(RegionNotFoundSnafu { region_id })?
193            .clone();
194        ensure!(
195            matches!(status, RegionEngineWithStatus::Ready(_)),
196            RegionNotReadySnafu { region_id }
197        );
198
199        self.inner
200            .table_provider_factory
201            .create(region_id, status.into_engine(), ctx)
202            .await
203            .context(ExecuteLogicalPlanSnafu)
204    }
205
206    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
207    pub async fn handle_remote_read(
208        &self,
209        request: api::v1::region::QueryRequest,
210        query_ctx: QueryContextRef,
211    ) -> Result<SendableRecordBatchStream> {
212        let _permit = if let Some(p) = &self.inner.parallelism {
213            Some(p.acquire().await?)
214        } else {
215            None
216        };
217
218        let region_id = RegionId::from_u64(request.region_id);
219        let catalog_list = Arc::new(NameAwareCatalogList::new(
220            self.clone(),
221            region_id,
222            query_ctx.clone(),
223        ));
224
225        if query_ctx.explain_verbose() {
226            common_telemetry::info!("Handle remote read for region: {}", region_id);
227        }
228
229        let decoder = self
230            .inner
231            .query_engine
232            .engine_context(query_ctx.clone())
233            .new_plan_decoder()
234            .context(NewPlanDecoderSnafu)?;
235
236        let plan = decoder
237            .decode(Bytes::from(request.plan), catalog_list, false)
238            .await
239            .context(DecodeLogicalPlanSnafu)?;
240
241        self.inner
242            .handle_read(
243                QueryRequest {
244                    header: request.header,
245                    region_id,
246                    plan,
247                },
248                query_ctx,
249            )
250            .await
251    }
252
253    #[tracing::instrument(skip_all)]
254    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
255        let _permit = if let Some(p) = &self.inner.parallelism {
256            Some(p.acquire().await?)
257        } else {
258            None
259        };
260
261        let ctx = request.header.as_ref().map(|h| h.into());
262        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
263
264        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
265            .context(DataFusionSnafu)?;
266        let mut injector = injector_builder
267            .build(self, request.region_id, query_ctx.clone())
268            .await?;
269
270        let plan = request
271            .plan
272            .rewrite(&mut injector)
273            .context(DataFusionSnafu)?
274            .data;
275
276        self.inner
277            .handle_read(QueryRequest { plan, ..request }, query_ctx)
278            .await
279    }
280
281    /// Returns all opened and reportable regions.
282    ///
283    /// Notes: except all metrics regions.
284    pub fn reportable_regions(&self) -> Vec<RegionStat> {
285        self.inner
286            .region_map
287            .iter()
288            .filter_map(|e| {
289                let region_id = *e.key();
290                // Filters out any regions whose role equals None.
291                e.role(region_id).map(|role| RegionStat {
292                    region_id,
293                    engine: e.value().name().to_string(),
294                    role,
295                })
296            })
297            .collect()
298    }
299
300    /// Returns the reportable topics.
301    pub fn topic_stats(&self) -> Vec<TopicStat> {
302        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
303        let Some(reporter) = reporter.as_mut() else {
304            return vec![];
305        };
306        reporter
307            .reportable_topics()
308            .into_iter()
309            .map(|stat| TopicStat {
310                topic_name: stat.topic,
311                record_size: stat.record_size,
312                record_num: stat.record_num,
313                latest_entry_id: stat.latest_entry_id,
314            })
315            .collect()
316    }
317
318    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
319        self.inner.region_map.get(&region_id).and_then(|engine| {
320            engine.role(region_id).map(|role| match role {
321                RegionRole::Follower => false,
322                RegionRole::Leader => true,
323                RegionRole::DowngradingLeader => true,
324            })
325        })
326    }
327
328    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
329        let engine = self
330            .inner
331            .region_map
332            .get(&region_id)
333            .with_context(|| RegionNotFoundSnafu { region_id })?;
334        engine
335            .set_region_role(region_id, role)
336            .with_context(|_| HandleRegionRequestSnafu { region_id })
337    }
338
339    /// Set region role state gracefully.
340    ///
341    /// For [SettableRegionRoleState::Follower]:
342    /// After the call returns, the engine ensures that
343    /// no **further** write or flush operations will succeed in this region.
344    ///
345    /// For [SettableRegionRoleState::DowngradingLeader]:
346    /// After the call returns, the engine ensures that
347    /// no **further** write operations will succeed in this region.
348    pub async fn set_region_role_state_gracefully(
349        &self,
350        region_id: RegionId,
351        state: SettableRegionRoleState,
352    ) -> Result<SetRegionRoleStateResponse> {
353        match self.inner.region_map.get(&region_id) {
354            Some(engine) => Ok(engine
355                .set_region_role_state_gracefully(region_id, state)
356                .await
357                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
358            None => Ok(SetRegionRoleStateResponse::NotFound),
359        }
360    }
361
362    pub fn runtime(&self) -> Runtime {
363        self.inner.runtime.clone()
364    }
365
366    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
367        match self.inner.region_map.get(&region_id) {
368            Some(e) => e.region_statistic(region_id),
369            None => None,
370        }
371    }
372
373    /// Stop the region server.
374    pub async fn stop(&self) -> Result<()> {
375        self.inner.stop().await
376    }
377
378    #[cfg(test)]
379    /// Registers a region for test purpose.
380    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
381        self.inner
382            .region_map
383            .insert(region_id, RegionEngineWithStatus::Ready(engine));
384    }
385
386    async fn handle_batch_ddl_requests(
387        &self,
388        request: region_request::Body,
389    ) -> Result<RegionResponse> {
390        // Safety: we have already checked the request type in `RegionServer::handle()`.
391        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
392            .context(BuildRegionRequestsSnafu)?
393            .unwrap();
394        let tracing_context = TracingContext::from_current_span();
395
396        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
397        self.inner
398            .handle_batch_request(batch_request)
399            .trace(span)
400            .await
401    }
402
403    async fn handle_requests_in_parallel(
404        &self,
405        request: region_request::Body,
406    ) -> Result<RegionResponse> {
407        let requests =
408            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
409        let tracing_context = TracingContext::from_current_span();
410
411        let join_tasks = requests.into_iter().map(|(region_id, req)| {
412            let self_to_move = self;
413            let span = tracing_context.attach(info_span!(
414                "RegionServer::handle_region_request",
415                region_id = region_id.to_string()
416            ));
417            async move {
418                self_to_move
419                    .handle_request(region_id, req)
420                    .trace(span)
421                    .await
422            }
423        });
424
425        let results = try_join_all(join_tasks).await?;
426        let mut affected_rows = 0;
427        let mut extensions = HashMap::new();
428        for result in results {
429            affected_rows += result.affected_rows;
430            extensions.extend(result.extensions);
431        }
432
433        Ok(RegionResponse {
434            affected_rows,
435            extensions,
436            metadata: Vec::new(),
437        })
438    }
439
440    async fn handle_requests_in_serial(
441        &self,
442        request: region_request::Body,
443    ) -> Result<RegionResponse> {
444        let requests =
445            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
446        let tracing_context = TracingContext::from_current_span();
447
448        let mut affected_rows = 0;
449        let mut extensions = HashMap::new();
450        // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
451        // modify this part to avoid inefficient serial loop calls.
452        for (region_id, req) in requests {
453            let span = tracing_context.attach(info_span!(
454                "RegionServer::handle_region_request",
455                region_id = region_id.to_string()
456            ));
457            let result = self.handle_request(region_id, req).trace(span).await?;
458
459            affected_rows += result.affected_rows;
460            extensions.extend(result.extensions);
461        }
462
463        Ok(RegionResponse {
464            affected_rows,
465            extensions,
466            metadata: Vec::new(),
467        })
468    }
469
470    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
471        let region_id = RegionId::from_u64(request.region_id);
472        let manifest_info = request
473            .manifest_info
474            .context(error::MissingRequiredFieldSnafu {
475                name: "manifest_info",
476            })?;
477
478        let manifest_info = match manifest_info {
479            ManifestInfo::MitoManifestInfo(info) => {
480                RegionManifestInfo::mito(info.data_manifest_version, 0)
481            }
482            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
483                info.data_manifest_version,
484                0,
485                info.metadata_manifest_version,
486                0,
487            ),
488        };
489
490        let tracing_context = TracingContext::from_current_span();
491        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
492
493        self.sync_region(region_id, manifest_info)
494            .trace(span)
495            .await
496            .map(|_| RegionResponse::new(AffectedRows::default()))
497    }
498
499    /// Handles the ListMetadata request and retrieves metadata for specified regions.
500    ///
501    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
502    /// non-existing regions as `null`.
503    #[tracing::instrument(skip_all)]
504    async fn handle_list_metadata_request(
505        &self,
506        request: &ListMetadataRequest,
507    ) -> Result<RegionResponse> {
508        let mut region_metadatas = Vec::new();
509        // Collect metadata for each region
510        for region_id in &request.region_ids {
511            let region_id = RegionId::from_u64(*region_id);
512            // Get the engine.
513            let Some(engine) = self.find_engine(region_id)? else {
514                region_metadatas.push(None);
515                continue;
516            };
517
518            match engine.get_metadata(region_id).await {
519                Ok(metadata) => region_metadatas.push(Some(metadata)),
520                Err(err) => {
521                    if err.status_code() == StatusCode::RegionNotFound {
522                        region_metadatas.push(None);
523                    } else {
524                        Err(err).with_context(|_| GetRegionMetadataSnafu {
525                            engine: engine.name(),
526                            region_id,
527                        })?;
528                    }
529                }
530            }
531        }
532
533        // Serialize metadata to JSON
534        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
535
536        let response = RegionResponse::from_metadata(json_result);
537
538        Ok(response)
539    }
540
541    /// Sync region manifest and registers new opened logical regions.
542    pub async fn sync_region(
543        &self,
544        region_id: RegionId,
545        manifest_info: RegionManifestInfo,
546    ) -> Result<()> {
547        let engine_with_status = self
548            .inner
549            .region_map
550            .get(&region_id)
551            .with_context(|| RegionNotFoundSnafu { region_id })?;
552
553        self.inner
554            .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
555            .await
556    }
557}
558
559#[async_trait]
560impl RegionServerHandler for RegionServer {
561    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
562        let response = match &request {
563            region_request::Body::Creates(_)
564            | region_request::Body::Drops(_)
565            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
566            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
567                self.handle_requests_in_parallel(request).await
568            }
569            region_request::Body::Sync(sync_request) => {
570                self.handle_sync_region_request(sync_request).await
571            }
572            region_request::Body::ListMetadata(list_metadata_request) => {
573                self.handle_list_metadata_request(list_metadata_request)
574                    .await
575            }
576            _ => self.handle_requests_in_serial(request).await,
577        }
578        .map_err(BoxedError::new)
579        .context(ExecuteGrpcRequestSnafu)?;
580
581        Ok(RegionResponseV1 {
582            header: Some(ResponseHeader {
583                status: Some(Status {
584                    status_code: StatusCode::Success as _,
585                    ..Default::default()
586                }),
587            }),
588            affected_rows: response.affected_rows as _,
589            extensions: response.extensions,
590            metadata: response.metadata,
591        })
592    }
593}
594
595#[async_trait]
596impl FlightCraft for RegionServer {
597    async fn do_get(
598        &self,
599        request: Request<Ticket>,
600    ) -> TonicResult<Response<TonicStream<FlightData>>> {
601        let ticket = request.into_inner().ticket;
602        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
603            .context(servers_error::InvalidFlightTicketSnafu)?;
604        let tracing_context = request
605            .header
606            .as_ref()
607            .map(|h| TracingContext::from_w3c(&h.tracing_context))
608            .unwrap_or_default();
609        let query_ctx = request
610            .header
611            .as_ref()
612            .map(|h| Arc::new(QueryContext::from(h)))
613            .unwrap_or(QueryContext::arc());
614
615        let result = self
616            .handle_remote_read(request, query_ctx.clone())
617            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
618            .await?;
619
620        let stream = Box::pin(FlightRecordBatchStream::new(
621            result,
622            tracing_context,
623            self.flight_compression,
624            query_ctx,
625        ));
626        Ok(Response::new(stream))
627    }
628}
629
630#[derive(Clone)]
631enum RegionEngineWithStatus {
632    // An opening, or creating region.
633    Registering(RegionEngineRef),
634    // A closing, or dropping region.
635    Deregistering(RegionEngineRef),
636    // A ready region.
637    Ready(RegionEngineRef),
638}
639
640impl RegionEngineWithStatus {
641    /// Returns [RegionEngineRef].
642    pub fn into_engine(self) -> RegionEngineRef {
643        match self {
644            RegionEngineWithStatus::Registering(engine) => engine,
645            RegionEngineWithStatus::Deregistering(engine) => engine,
646            RegionEngineWithStatus::Ready(engine) => engine,
647        }
648    }
649
650    /// Returns [RegionEngineRef] reference.
651    pub fn engine(&self) -> &RegionEngineRef {
652        match self {
653            RegionEngineWithStatus::Registering(engine) => engine,
654            RegionEngineWithStatus::Deregistering(engine) => engine,
655            RegionEngineWithStatus::Ready(engine) => engine,
656        }
657    }
658}
659
660impl Deref for RegionEngineWithStatus {
661    type Target = RegionEngineRef;
662
663    fn deref(&self) -> &Self::Target {
664        match self {
665            RegionEngineWithStatus::Registering(engine) => engine,
666            RegionEngineWithStatus::Deregistering(engine) => engine,
667            RegionEngineWithStatus::Ready(engine) => engine,
668        }
669    }
670}
671
672struct RegionServerInner {
673    engines: RwLock<HashMap<String, RegionEngineRef>>,
674    region_map: DashMap<RegionId, RegionEngineWithStatus>,
675    query_engine: QueryEngineRef,
676    runtime: Runtime,
677    event_listener: RegionServerEventListenerRef,
678    table_provider_factory: TableProviderFactoryRef,
679    // The number of queries allowed to be executed at the same time.
680    // Act as last line of defense on datanode to prevent query overloading.
681    parallelism: Option<RegionServerParallelism>,
682    // The topic stats reporter.
683    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
684    // HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
685    // server with a concrete engine; acceptable for now to fetch Mito-specific
686    // info (e.g., list SSTs). Consider a diagnostics trait later.
687    mito_engine: RwLock<Option<MitoEngine>>,
688}
689
690struct RegionServerParallelism {
691    semaphore: Semaphore,
692    timeout: Duration,
693}
694
695impl RegionServerParallelism {
696    pub fn from_opts(
697        max_concurrent_queries: usize,
698        concurrent_query_limiter_timeout: Duration,
699    ) -> Option<Self> {
700        if max_concurrent_queries == 0 {
701            return None;
702        }
703        Some(RegionServerParallelism {
704            semaphore: Semaphore::new(max_concurrent_queries),
705            timeout: concurrent_query_limiter_timeout,
706        })
707    }
708
709    pub async fn acquire(&self) -> Result<SemaphorePermit> {
710        timeout(self.timeout, self.semaphore.acquire())
711            .await
712            .context(ConcurrentQueryLimiterTimeoutSnafu)?
713            .context(ConcurrentQueryLimiterClosedSnafu)
714    }
715}
716
717enum CurrentEngine {
718    Engine(RegionEngineRef),
719    EarlyReturn(AffectedRows),
720}
721
722impl Debug for CurrentEngine {
723    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
724        match self {
725            CurrentEngine::Engine(engine) => f
726                .debug_struct("CurrentEngine")
727                .field("engine", &engine.name())
728                .finish(),
729            CurrentEngine::EarlyReturn(rows) => f
730                .debug_struct("CurrentEngine")
731                .field("return", rows)
732                .finish(),
733        }
734    }
735}
736
737impl RegionServerInner {
738    pub fn new(
739        query_engine: QueryEngineRef,
740        runtime: Runtime,
741        event_listener: RegionServerEventListenerRef,
742        table_provider_factory: TableProviderFactoryRef,
743        parallelism: Option<RegionServerParallelism>,
744    ) -> Self {
745        Self {
746            engines: RwLock::new(HashMap::new()),
747            region_map: DashMap::new(),
748            query_engine,
749            runtime,
750            event_listener,
751            table_provider_factory,
752            parallelism,
753            topic_stats_reporter: RwLock::new(None),
754            mito_engine: RwLock::new(None),
755        }
756    }
757
758    pub fn register_engine(&self, engine: RegionEngineRef) {
759        let engine_name = engine.name();
760        if engine_name == MITO_ENGINE_NAME
761            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
762        {
763            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
764        }
765
766        info!("Region Engine {engine_name} is registered");
767        self.engines
768            .write()
769            .unwrap()
770            .insert(engine_name.to_string(), engine);
771    }
772
773    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
774        info!("Set topic stats reporter");
775        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
776    }
777
778    fn get_engine(
779        &self,
780        region_id: RegionId,
781        region_change: &RegionChange,
782    ) -> Result<CurrentEngine> {
783        let current_region_status = self.region_map.get(&region_id);
784
785        let engine = match region_change {
786            RegionChange::Register(attribute) => match current_region_status {
787                Some(status) => match status.clone() {
788                    RegionEngineWithStatus::Registering(engine) => engine,
789                    RegionEngineWithStatus::Deregistering(_) => {
790                        return error::RegionBusySnafu { region_id }.fail();
791                    }
792                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
793                },
794                _ => self
795                    .engines
796                    .read()
797                    .unwrap()
798                    .get(attribute.engine())
799                    .with_context(|| RegionEngineNotFoundSnafu {
800                        name: attribute.engine(),
801                    })?
802                    .clone(),
803            },
804            RegionChange::Deregisters => match current_region_status {
805                Some(status) => match status.clone() {
806                    RegionEngineWithStatus::Registering(_) => {
807                        return error::RegionBusySnafu { region_id }.fail();
808                    }
809                    RegionEngineWithStatus::Deregistering(_) => {
810                        return Ok(CurrentEngine::EarlyReturn(0));
811                    }
812                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
813                },
814                None => return Ok(CurrentEngine::EarlyReturn(0)),
815            },
816            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
817                match current_region_status {
818                    Some(status) => match status.clone() {
819                        RegionEngineWithStatus::Registering(_) => {
820                            return error::RegionNotReadySnafu { region_id }.fail();
821                        }
822                        RegionEngineWithStatus::Deregistering(_) => {
823                            return error::RegionNotFoundSnafu { region_id }.fail();
824                        }
825                        RegionEngineWithStatus::Ready(engine) => engine,
826                    },
827                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
828                }
829            }
830        };
831
832        Ok(CurrentEngine::Engine(engine))
833    }
834
835    async fn handle_batch_open_requests_inner(
836        &self,
837        engine: RegionEngineRef,
838        parallelism: usize,
839        requests: Vec<(RegionId, RegionOpenRequest)>,
840        ignore_nonexistent_region: bool,
841    ) -> Result<Vec<RegionId>> {
842        let region_changes = requests
843            .iter()
844            .map(|(region_id, open)| {
845                let attribute = parse_region_attribute(&open.engine, &open.options)?;
846                Ok((*region_id, RegionChange::Register(attribute)))
847            })
848            .collect::<Result<HashMap<_, _>>>()?;
849
850        for (&region_id, region_change) in &region_changes {
851            self.set_region_status_not_ready(region_id, &engine, region_change)
852        }
853
854        let mut open_regions = Vec::with_capacity(requests.len());
855        let mut errors = vec![];
856        match engine
857            .handle_batch_open_requests(parallelism, requests)
858            .await
859            .with_context(|_| HandleBatchOpenRequestSnafu)
860        {
861            Ok(results) => {
862                for (region_id, result) in results {
863                    let region_change = &region_changes[&region_id];
864                    match result {
865                        Ok(_) => {
866                            if let Err(e) = self
867                                .set_region_status_ready(region_id, engine.clone(), *region_change)
868                                .await
869                            {
870                                error!(e; "Failed to set region to ready: {}", region_id);
871                                errors.push(BoxedError::new(e));
872                            } else {
873                                open_regions.push(region_id)
874                            }
875                        }
876                        Err(e) => {
877                            self.unset_region_status(region_id, &engine, *region_change);
878                            if e.status_code() == StatusCode::RegionNotFound
879                                && ignore_nonexistent_region
880                            {
881                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
882                            } else {
883                                error!(e; "Failed to open region: {}", region_id);
884                                errors.push(e);
885                            }
886                        }
887                    }
888                }
889            }
890            Err(e) => {
891                for (&region_id, region_change) in &region_changes {
892                    self.unset_region_status(region_id, &engine, *region_change);
893                }
894                error!(e; "Failed to open batch regions");
895                errors.push(BoxedError::new(e));
896            }
897        }
898
899        if !errors.is_empty() {
900            return error::UnexpectedSnafu {
901                // Returns the first error.
902                violated: format!("Failed to open batch regions: {:?}", errors[0]),
903            }
904            .fail();
905        }
906
907        Ok(open_regions)
908    }
909
910    pub async fn handle_batch_open_requests(
911        &self,
912        parallelism: usize,
913        requests: Vec<(RegionId, RegionOpenRequest)>,
914        ignore_nonexistent_region: bool,
915    ) -> Result<Vec<RegionId>> {
916        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
917            HashMap::with_capacity(requests.len());
918        for (region_id, request) in requests {
919            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
920                requests.push((region_id, request));
921            } else {
922                engine_grouped_requests
923                    .insert(request.engine.to_string(), vec![(region_id, request)]);
924            }
925        }
926
927        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
928        for (engine, requests) in engine_grouped_requests {
929            let engine = self
930                .engines
931                .read()
932                .unwrap()
933                .get(&engine)
934                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
935                .clone();
936            results.push(
937                self.handle_batch_open_requests_inner(
938                    engine,
939                    parallelism,
940                    requests,
941                    ignore_nonexistent_region,
942                )
943                .await,
944            )
945        }
946
947        Ok(results
948            .into_iter()
949            .collect::<Result<Vec<_>>>()?
950            .into_iter()
951            .flatten()
952            .collect::<Vec<_>>())
953    }
954
955    // Handle requests in batch.
956    //
957    // limitation: all create requests must be in the same engine.
958    pub async fn handle_batch_request(
959        &self,
960        batch_request: BatchRegionDdlRequest,
961    ) -> Result<RegionResponse> {
962        let region_changes = match &batch_request {
963            BatchRegionDdlRequest::Create(requests) => requests
964                .iter()
965                .map(|(region_id, create)| {
966                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
967                    Ok((*region_id, RegionChange::Register(attribute)))
968                })
969                .collect::<Result<Vec<_>>>()?,
970            BatchRegionDdlRequest::Drop(requests) => requests
971                .iter()
972                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
973                .collect::<Vec<_>>(),
974            BatchRegionDdlRequest::Alter(requests) => requests
975                .iter()
976                .map(|(region_id, _)| (*region_id, RegionChange::None))
977                .collect::<Vec<_>>(),
978        };
979
980        // The ddl procedure will ensure all requests are in the same engine.
981        // Therefore, we can get the engine from the first request.
982        let (first_region_id, first_region_change) = region_changes.first().unwrap();
983        let engine = match self.get_engine(*first_region_id, first_region_change)? {
984            CurrentEngine::Engine(engine) => engine,
985            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
986        };
987
988        for (region_id, region_change) in region_changes.iter() {
989            self.set_region_status_not_ready(*region_id, &engine, region_change);
990        }
991
992        let ddl_type = batch_request.request_type();
993        let result = engine
994            .handle_batch_ddl_requests(batch_request)
995            .await
996            .context(HandleBatchDdlRequestSnafu { ddl_type });
997
998        match result {
999            Ok(result) => {
1000                for (region_id, region_change) in &region_changes {
1001                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1002                        .await?;
1003                }
1004
1005                Ok(RegionResponse {
1006                    affected_rows: result.affected_rows,
1007                    extensions: result.extensions,
1008                    metadata: Vec::new(),
1009                })
1010            }
1011            Err(err) => {
1012                for (region_id, region_change) in region_changes {
1013                    self.unset_region_status(region_id, &engine, region_change);
1014                }
1015
1016                Err(err)
1017            }
1018        }
1019    }
1020
1021    pub async fn handle_request(
1022        &self,
1023        region_id: RegionId,
1024        request: RegionRequest,
1025    ) -> Result<RegionResponse> {
1026        let request_type = request.request_type();
1027        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1028            .with_label_values(&[request_type])
1029            .start_timer();
1030
1031        let region_change = match &request {
1032            RegionRequest::Create(create) => {
1033                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1034                RegionChange::Register(attribute)
1035            }
1036            RegionRequest::Open(open) => {
1037                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1038                RegionChange::Register(attribute)
1039            }
1040            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1041            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1042                RegionChange::Ingest
1043            }
1044            RegionRequest::Alter(_)
1045            | RegionRequest::Flush(_)
1046            | RegionRequest::Compact(_)
1047            | RegionRequest::Truncate(_) => RegionChange::None,
1048            RegionRequest::Catchup(_) => RegionChange::Catchup,
1049        };
1050
1051        let engine = match self.get_engine(region_id, &region_change)? {
1052            CurrentEngine::Engine(engine) => engine,
1053            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1054        };
1055
1056        // Sets corresponding region status to registering/deregistering before the operation.
1057        self.set_region_status_not_ready(region_id, &engine, &region_change);
1058
1059        match engine
1060            .handle_request(region_id, request)
1061            .await
1062            .with_context(|_| HandleRegionRequestSnafu { region_id })
1063        {
1064            Ok(result) => {
1065                // Update metrics
1066                if matches!(region_change, RegionChange::Ingest) {
1067                    crate::metrics::REGION_CHANGED_ROW_COUNT
1068                        .with_label_values(&[request_type])
1069                        .inc_by(result.affected_rows as u64);
1070                }
1071                // Sets corresponding region status to ready.
1072                self.set_region_status_ready(region_id, engine.clone(), region_change)
1073                    .await?;
1074
1075                Ok(RegionResponse {
1076                    affected_rows: result.affected_rows,
1077                    extensions: result.extensions,
1078                    metadata: Vec::new(),
1079                })
1080            }
1081            Err(err) => {
1082                // Removes the region status if the operation fails.
1083                self.unset_region_status(region_id, &engine, region_change);
1084                Err(err)
1085            }
1086        }
1087    }
1088
1089    /// Handles the sync region request.
1090    pub async fn handle_sync_region(
1091        &self,
1092        engine: &RegionEngineRef,
1093        region_id: RegionId,
1094        manifest_info: RegionManifestInfo,
1095    ) -> Result<()> {
1096        let Some(new_opened_regions) = engine
1097            .sync_region(region_id, manifest_info)
1098            .await
1099            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1100            .new_opened_logical_region_ids()
1101        else {
1102            warn!("No new opened logical regions");
1103            return Ok(());
1104        };
1105
1106        for region in new_opened_regions {
1107            self.region_map
1108                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1109            info!("Logical region {} is registered!", region);
1110        }
1111
1112        Ok(())
1113    }
1114
1115    fn set_region_status_not_ready(
1116        &self,
1117        region_id: RegionId,
1118        engine: &RegionEngineRef,
1119        region_change: &RegionChange,
1120    ) {
1121        match region_change {
1122            RegionChange::Register(_) => {
1123                self.region_map.insert(
1124                    region_id,
1125                    RegionEngineWithStatus::Registering(engine.clone()),
1126                );
1127            }
1128            RegionChange::Deregisters => {
1129                self.region_map.insert(
1130                    region_id,
1131                    RegionEngineWithStatus::Deregistering(engine.clone()),
1132                );
1133            }
1134            _ => {}
1135        }
1136    }
1137
1138    fn unset_region_status(
1139        &self,
1140        region_id: RegionId,
1141        engine: &RegionEngineRef,
1142        region_change: RegionChange,
1143    ) {
1144        match region_change {
1145            RegionChange::None | RegionChange::Ingest => {}
1146            RegionChange::Register(_) => {
1147                self.region_map.remove(&region_id);
1148            }
1149            RegionChange::Deregisters => {
1150                self.region_map
1151                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1152            }
1153            RegionChange::Catchup => {}
1154        }
1155    }
1156
1157    async fn set_region_status_ready(
1158        &self,
1159        region_id: RegionId,
1160        engine: RegionEngineRef,
1161        region_change: RegionChange,
1162    ) -> Result<()> {
1163        let engine_type = engine.name();
1164        match region_change {
1165            RegionChange::None | RegionChange::Ingest => {}
1166            RegionChange::Register(attribute) => {
1167                info!(
1168                    "Region {region_id} is registered to engine {}",
1169                    attribute.engine()
1170                );
1171                self.region_map
1172                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1173
1174                match attribute {
1175                    RegionAttribute::Metric { physical } => {
1176                        if physical {
1177                            // Registers the logical regions belong to the physical region (`region_id`).
1178                            self.register_logical_regions(&engine, region_id).await?;
1179                            // We only send the `on_region_registered` event of the physical region.
1180                            self.event_listener.on_region_registered(region_id);
1181                        }
1182                    }
1183                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1184                    RegionAttribute::File => {
1185                        // do nothing
1186                    }
1187                }
1188            }
1189            RegionChange::Deregisters => {
1190                info!("Region {region_id} is deregistered from engine {engine_type}");
1191                self.region_map
1192                    .remove(&region_id)
1193                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1194                self.event_listener.on_region_deregistered(region_id);
1195            }
1196            RegionChange::Catchup => {
1197                if is_metric_engine(engine.name()) {
1198                    // Registers the logical regions belong to the physical region (`region_id`).
1199                    self.register_logical_regions(&engine, region_id).await?;
1200                }
1201            }
1202        }
1203        Ok(())
1204    }
1205
1206    async fn register_logical_regions(
1207        &self,
1208        engine: &RegionEngineRef,
1209        physical_region_id: RegionId,
1210    ) -> Result<()> {
1211        let metric_engine =
1212            engine
1213                .as_any()
1214                .downcast_ref::<MetricEngine>()
1215                .context(UnexpectedSnafu {
1216                    violated: format!(
1217                        "expecting engine type '{}', actual '{}'",
1218                        METRIC_ENGINE_NAME,
1219                        engine.name(),
1220                    ),
1221                })?;
1222
1223        let logical_regions = metric_engine
1224            .logical_regions(physical_region_id)
1225            .await
1226            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1227
1228        for region in logical_regions {
1229            self.region_map
1230                .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1231            info!("Logical region {} is registered!", region);
1232        }
1233        Ok(())
1234    }
1235
1236    pub async fn handle_read(
1237        &self,
1238        request: QueryRequest,
1239        query_ctx: QueryContextRef,
1240    ) -> Result<SendableRecordBatchStream> {
1241        // TODO(ruihang): add metrics and set trace id
1242
1243        let result = self
1244            .query_engine
1245            .execute(request.plan, query_ctx)
1246            .await
1247            .context(ExecuteLogicalPlanSnafu)?;
1248
1249        match result.data {
1250            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1251                UnsupportedOutputSnafu { expected: "stream" }.fail()
1252            }
1253            OutputData::Stream(stream) => Ok(stream),
1254        }
1255    }
1256
1257    async fn stop(&self) -> Result<()> {
1258        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1259        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1260        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1261        //
1262        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1263        // it here, collect the values first then use later separately.
1264
1265        let regions = self
1266            .region_map
1267            .iter()
1268            .map(|x| (*x.key(), x.value().clone()))
1269            .collect::<Vec<_>>();
1270        let num_regions = regions.len();
1271
1272        for (region_id, engine) in regions {
1273            let closed = engine
1274                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1275                .await;
1276            match closed {
1277                Ok(_) => debug!("Region {region_id} is closed"),
1278                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1279            }
1280        }
1281        self.region_map.clear();
1282        info!("closed {num_regions} regions");
1283
1284        drop(self.mito_engine.write().unwrap().take());
1285        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1286        for (engine_name, engine) in engines {
1287            engine
1288                .stop()
1289                .await
1290                .context(StopRegionEngineSnafu { name: &engine_name })?;
1291            info!("Region engine {engine_name} is stopped");
1292        }
1293
1294        Ok(())
1295    }
1296}
1297
1298#[derive(Debug, Clone, Copy)]
1299enum RegionChange {
1300    None,
1301    Register(RegionAttribute),
1302    Deregisters,
1303    Catchup,
1304    Ingest,
1305}
1306
1307fn is_metric_engine(engine: &str) -> bool {
1308    engine == METRIC_ENGINE_NAME
1309}
1310
1311fn parse_region_attribute(
1312    engine: &str,
1313    options: &HashMap<String, String>,
1314) -> Result<RegionAttribute> {
1315    match engine {
1316        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1317        METRIC_ENGINE_NAME => {
1318            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1319
1320            Ok(RegionAttribute::Metric { physical })
1321        }
1322        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1323        _ => error::UnexpectedSnafu {
1324            violated: format!("Unknown engine: {}", engine),
1325        }
1326        .fail(),
1327    }
1328}
1329
1330#[derive(Debug, Clone, Copy)]
1331enum RegionAttribute {
1332    Mito,
1333    Metric { physical: bool },
1334    File,
1335}
1336
1337impl RegionAttribute {
1338    fn engine(&self) -> &'static str {
1339        match self {
1340            RegionAttribute::Mito => MITO_ENGINE_NAME,
1341            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1342            RegionAttribute::File => FILE_ENGINE_NAME,
1343        }
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349
1350    use std::assert_matches::assert_matches;
1351
1352    use api::v1::SemanticType;
1353    use common_error::ext::ErrorExt;
1354    use datatypes::prelude::ConcreteDataType;
1355    use mito2::test_util::CreateRequestBuilder;
1356    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1357    use store_api::region_engine::RegionEngine;
1358    use store_api::region_request::{
1359        PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1360    };
1361    use store_api::storage::RegionId;
1362
1363    use super::*;
1364    use crate::error::Result;
1365    use crate::tests::{MockRegionEngine, mock_region_server};
1366
1367    #[tokio::test]
1368    async fn test_region_registering() {
1369        common_telemetry::init_default_ut_logging();
1370
1371        let mut mock_region_server = mock_region_server();
1372        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1373        let engine_name = engine.name();
1374        mock_region_server.register_engine(engine.clone());
1375        let region_id = RegionId::new(1, 1);
1376        let builder = CreateRequestBuilder::new();
1377        let create_req = builder.build();
1378        // Tries to create/open a registering region.
1379        mock_region_server.inner.region_map.insert(
1380            region_id,
1381            RegionEngineWithStatus::Registering(engine.clone()),
1382        );
1383        let response = mock_region_server
1384            .handle_request(region_id, RegionRequest::Create(create_req))
1385            .await
1386            .unwrap();
1387        assert_eq!(response.affected_rows, 0);
1388        let status = mock_region_server
1389            .inner
1390            .region_map
1391            .get(&region_id)
1392            .unwrap()
1393            .clone();
1394        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1395
1396        mock_region_server.inner.region_map.insert(
1397            region_id,
1398            RegionEngineWithStatus::Registering(engine.clone()),
1399        );
1400        let response = mock_region_server
1401            .handle_request(
1402                region_id,
1403                RegionRequest::Open(RegionOpenRequest {
1404                    engine: engine_name.to_string(),
1405                    table_dir: String::new(),
1406                    path_type: PathType::Bare,
1407                    options: Default::default(),
1408                    skip_wal_replay: false,
1409                    checkpoint: None,
1410                }),
1411            )
1412            .await
1413            .unwrap();
1414        assert_eq!(response.affected_rows, 0);
1415        let status = mock_region_server
1416            .inner
1417            .region_map
1418            .get(&region_id)
1419            .unwrap()
1420            .clone();
1421        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1422    }
1423
1424    #[tokio::test]
1425    async fn test_region_deregistering() {
1426        common_telemetry::init_default_ut_logging();
1427
1428        let mut mock_region_server = mock_region_server();
1429        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1430
1431        mock_region_server.register_engine(engine.clone());
1432
1433        let region_id = RegionId::new(1, 1);
1434
1435        // Tries to drop/close a registering region.
1436        mock_region_server.inner.region_map.insert(
1437            region_id,
1438            RegionEngineWithStatus::Deregistering(engine.clone()),
1439        );
1440
1441        let response = mock_region_server
1442            .handle_request(
1443                region_id,
1444                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1445            )
1446            .await
1447            .unwrap();
1448        assert_eq!(response.affected_rows, 0);
1449
1450        let status = mock_region_server
1451            .inner
1452            .region_map
1453            .get(&region_id)
1454            .unwrap()
1455            .clone();
1456        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1457
1458        mock_region_server.inner.region_map.insert(
1459            region_id,
1460            RegionEngineWithStatus::Deregistering(engine.clone()),
1461        );
1462
1463        let response = mock_region_server
1464            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1465            .await
1466            .unwrap();
1467        assert_eq!(response.affected_rows, 0);
1468
1469        let status = mock_region_server
1470            .inner
1471            .region_map
1472            .get(&region_id)
1473            .unwrap()
1474            .clone();
1475        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1476    }
1477
1478    #[tokio::test]
1479    async fn test_region_not_ready() {
1480        common_telemetry::init_default_ut_logging();
1481
1482        let mut mock_region_server = mock_region_server();
1483        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1484
1485        mock_region_server.register_engine(engine.clone());
1486
1487        let region_id = RegionId::new(1, 1);
1488
1489        // Tries to drop/close a registering region.
1490        mock_region_server.inner.region_map.insert(
1491            region_id,
1492            RegionEngineWithStatus::Registering(engine.clone()),
1493        );
1494
1495        let err = mock_region_server
1496            .handle_request(
1497                region_id,
1498                RegionRequest::Truncate(RegionTruncateRequest::All),
1499            )
1500            .await
1501            .unwrap_err();
1502
1503        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1504    }
1505
1506    #[tokio::test]
1507    async fn test_region_request_failed() {
1508        common_telemetry::init_default_ut_logging();
1509
1510        let mut mock_region_server = mock_region_server();
1511        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1512            MITO_ENGINE_NAME,
1513            Box::new(|_region_id, _request| {
1514                error::UnexpectedSnafu {
1515                    violated: "test".to_string(),
1516                }
1517                .fail()
1518            }),
1519        );
1520
1521        mock_region_server.register_engine(engine.clone());
1522
1523        let region_id = RegionId::new(1, 1);
1524        let builder = CreateRequestBuilder::new();
1525        let create_req = builder.build();
1526        mock_region_server
1527            .handle_request(region_id, RegionRequest::Create(create_req))
1528            .await
1529            .unwrap_err();
1530
1531        let status = mock_region_server.inner.region_map.get(&region_id);
1532        assert!(status.is_none());
1533
1534        mock_region_server
1535            .inner
1536            .region_map
1537            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1538
1539        mock_region_server
1540            .handle_request(
1541                region_id,
1542                RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1543            )
1544            .await
1545            .unwrap_err();
1546
1547        let status = mock_region_server.inner.region_map.get(&region_id);
1548        assert!(status.is_some());
1549    }
1550
1551    #[tokio::test]
1552    async fn test_batch_open_region_ignore_nonexistent_regions() {
1553        common_telemetry::init_default_ut_logging();
1554        let mut mock_region_server = mock_region_server();
1555        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1556            MITO_ENGINE_NAME,
1557            Box::new(|region_id, _request| {
1558                if region_id == RegionId::new(1, 1) {
1559                    error::RegionNotFoundSnafu { region_id }.fail()
1560                } else {
1561                    Ok(0)
1562                }
1563            }),
1564        );
1565        mock_region_server.register_engine(engine.clone());
1566
1567        let region_ids = mock_region_server
1568            .handle_batch_open_requests(
1569                8,
1570                vec![
1571                    (
1572                        RegionId::new(1, 1),
1573                        RegionOpenRequest {
1574                            engine: MITO_ENGINE_NAME.to_string(),
1575                            table_dir: String::new(),
1576                            path_type: PathType::Bare,
1577                            options: Default::default(),
1578                            skip_wal_replay: false,
1579                            checkpoint: None,
1580                        },
1581                    ),
1582                    (
1583                        RegionId::new(1, 2),
1584                        RegionOpenRequest {
1585                            engine: MITO_ENGINE_NAME.to_string(),
1586                            table_dir: String::new(),
1587                            path_type: PathType::Bare,
1588                            options: Default::default(),
1589                            skip_wal_replay: false,
1590                            checkpoint: None,
1591                        },
1592                    ),
1593                ],
1594                true,
1595            )
1596            .await
1597            .unwrap();
1598        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1599
1600        let err = mock_region_server
1601            .handle_batch_open_requests(
1602                8,
1603                vec![
1604                    (
1605                        RegionId::new(1, 1),
1606                        RegionOpenRequest {
1607                            engine: MITO_ENGINE_NAME.to_string(),
1608                            table_dir: String::new(),
1609                            path_type: PathType::Bare,
1610                            options: Default::default(),
1611                            skip_wal_replay: false,
1612                            checkpoint: None,
1613                        },
1614                    ),
1615                    (
1616                        RegionId::new(1, 2),
1617                        RegionOpenRequest {
1618                            engine: MITO_ENGINE_NAME.to_string(),
1619                            table_dir: String::new(),
1620                            path_type: PathType::Bare,
1621                            options: Default::default(),
1622                            skip_wal_replay: false,
1623                            checkpoint: None,
1624                        },
1625                    ),
1626                ],
1627                false,
1628            )
1629            .await
1630            .unwrap_err();
1631        assert_eq!(err.status_code(), StatusCode::Unexpected);
1632    }
1633
1634    struct CurrentEngineTest {
1635        region_id: RegionId,
1636        current_region_status: Option<RegionEngineWithStatus>,
1637        region_change: RegionChange,
1638        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1639    }
1640
1641    #[tokio::test]
1642    async fn test_current_engine() {
1643        common_telemetry::init_default_ut_logging();
1644
1645        let mut mock_region_server = mock_region_server();
1646        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1647        mock_region_server.register_engine(engine.clone());
1648
1649        let region_id = RegionId::new(1024, 1);
1650        let tests = vec![
1651            // RegionChange::None
1652            CurrentEngineTest {
1653                region_id,
1654                current_region_status: None,
1655                region_change: RegionChange::None,
1656                assert: Box::new(|result| {
1657                    let err = result.unwrap_err();
1658                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1659                }),
1660            },
1661            CurrentEngineTest {
1662                region_id,
1663                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1664                region_change: RegionChange::None,
1665                assert: Box::new(|result| {
1666                    let current_engine = result.unwrap();
1667                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1668                }),
1669            },
1670            CurrentEngineTest {
1671                region_id,
1672                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1673                region_change: RegionChange::None,
1674                assert: Box::new(|result| {
1675                    let err = result.unwrap_err();
1676                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1677                }),
1678            },
1679            CurrentEngineTest {
1680                region_id,
1681                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1682                region_change: RegionChange::None,
1683                assert: Box::new(|result| {
1684                    let err = result.unwrap_err();
1685                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1686                }),
1687            },
1688            // RegionChange::Register
1689            CurrentEngineTest {
1690                region_id,
1691                current_region_status: None,
1692                region_change: RegionChange::Register(RegionAttribute::Mito),
1693                assert: Box::new(|result| {
1694                    let current_engine = result.unwrap();
1695                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1696                }),
1697            },
1698            CurrentEngineTest {
1699                region_id,
1700                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1701                region_change: RegionChange::Register(RegionAttribute::Mito),
1702                assert: Box::new(|result| {
1703                    let current_engine = result.unwrap();
1704                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1705                }),
1706            },
1707            CurrentEngineTest {
1708                region_id,
1709                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1710                region_change: RegionChange::Register(RegionAttribute::Mito),
1711                assert: Box::new(|result| {
1712                    let err = result.unwrap_err();
1713                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1714                }),
1715            },
1716            CurrentEngineTest {
1717                region_id,
1718                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1719                region_change: RegionChange::Register(RegionAttribute::Mito),
1720                assert: Box::new(|result| {
1721                    let current_engine = result.unwrap();
1722                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1723                }),
1724            },
1725            // RegionChange::Deregister
1726            CurrentEngineTest {
1727                region_id,
1728                current_region_status: None,
1729                region_change: RegionChange::Deregisters,
1730                assert: Box::new(|result| {
1731                    let current_engine = result.unwrap();
1732                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1733                }),
1734            },
1735            CurrentEngineTest {
1736                region_id,
1737                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1738                region_change: RegionChange::Deregisters,
1739                assert: Box::new(|result| {
1740                    let err = result.unwrap_err();
1741                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
1742                }),
1743            },
1744            CurrentEngineTest {
1745                region_id,
1746                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1747                region_change: RegionChange::Deregisters,
1748                assert: Box::new(|result| {
1749                    let current_engine = result.unwrap();
1750                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1751                }),
1752            },
1753            CurrentEngineTest {
1754                region_id,
1755                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1756                region_change: RegionChange::Deregisters,
1757                assert: Box::new(|result| {
1758                    let current_engine = result.unwrap();
1759                    assert_matches!(current_engine, CurrentEngine::Engine(_));
1760                }),
1761            },
1762        ];
1763
1764        for test in tests {
1765            let CurrentEngineTest {
1766                region_id,
1767                current_region_status,
1768                region_change,
1769                assert,
1770            } = test;
1771
1772            // Sets up
1773            if let Some(status) = current_region_status {
1774                mock_region_server
1775                    .inner
1776                    .region_map
1777                    .insert(region_id, status);
1778            } else {
1779                mock_region_server.inner.region_map.remove(&region_id);
1780            }
1781
1782            let result = mock_region_server
1783                .inner
1784                .get_engine(region_id, &region_change);
1785
1786            assert(result);
1787        }
1788    }
1789
1790    #[tokio::test]
1791    async fn test_region_server_parallelism() {
1792        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1793        let first_query = p.acquire().await;
1794        assert!(first_query.is_ok());
1795        let second_query = p.acquire().await;
1796        assert!(second_query.is_ok());
1797        let third_query = p.acquire().await;
1798        assert!(third_query.is_err());
1799        let err = third_query.unwrap_err();
1800        assert_eq!(
1801            err.output_msg(),
1802            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1803        );
1804        drop(first_query);
1805        let forth_query = p.acquire().await;
1806        assert!(forth_query.is_ok());
1807    }
1808
1809    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1810        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1811        metadata_builder.push_column_metadata(ColumnMetadata {
1812            column_schema: datatypes::schema::ColumnSchema::new(
1813                "timestamp",
1814                ConcreteDataType::timestamp_nanosecond_datatype(),
1815                false,
1816            ),
1817            semantic_type: SemanticType::Timestamp,
1818            column_id: 0,
1819        });
1820        metadata_builder.push_column_metadata(ColumnMetadata {
1821            column_schema: datatypes::schema::ColumnSchema::new(
1822                "file",
1823                ConcreteDataType::string_datatype(),
1824                true,
1825            ),
1826            semantic_type: SemanticType::Tag,
1827            column_id: 1,
1828        });
1829        metadata_builder.push_column_metadata(ColumnMetadata {
1830            column_schema: datatypes::schema::ColumnSchema::new(
1831                "message",
1832                ConcreteDataType::string_datatype(),
1833                true,
1834            ),
1835            semantic_type: SemanticType::Field,
1836            column_id: 2,
1837        });
1838        metadata_builder.primary_key(vec![1]);
1839        metadata_builder.build().unwrap()
1840    }
1841
1842    #[tokio::test]
1843    async fn test_handle_list_metadata_request() {
1844        common_telemetry::init_default_ut_logging();
1845
1846        let mut mock_region_server = mock_region_server();
1847        let region_id_1 = RegionId::new(1, 0);
1848        let region_id_2 = RegionId::new(2, 0);
1849
1850        let metadata_1 = mock_region_metadata(region_id_1);
1851        let metadata_2 = mock_region_metadata(region_id_2);
1852        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
1853
1854        let metadata_1 = Arc::new(metadata_1);
1855        let metadata_2 = Arc::new(metadata_2);
1856        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1857            MITO_ENGINE_NAME,
1858            Box::new(move |region_id| {
1859                if region_id == region_id_1 {
1860                    Ok(metadata_1.clone())
1861                } else if region_id == region_id_2 {
1862                    Ok(metadata_2.clone())
1863                } else {
1864                    error::RegionNotFoundSnafu { region_id }.fail()
1865                }
1866            }),
1867        );
1868
1869        mock_region_server.register_engine(engine.clone());
1870        mock_region_server
1871            .inner
1872            .region_map
1873            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1874        mock_region_server
1875            .inner
1876            .region_map
1877            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1878
1879        // All regions exist.
1880        let list_metadata_request = ListMetadataRequest {
1881            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1882        };
1883        let response = mock_region_server
1884            .handle_list_metadata_request(&list_metadata_request)
1885            .await
1886            .unwrap();
1887        let decoded_metadata: Vec<Option<RegionMetadata>> =
1888            serde_json::from_slice(&response.metadata).unwrap();
1889        assert_eq!(metadatas, decoded_metadata);
1890    }
1891
1892    #[tokio::test]
1893    async fn test_handle_list_metadata_not_found() {
1894        common_telemetry::init_default_ut_logging();
1895
1896        let mut mock_region_server = mock_region_server();
1897        let region_id_1 = RegionId::new(1, 0);
1898        let region_id_2 = RegionId::new(2, 0);
1899
1900        let metadata_1 = mock_region_metadata(region_id_1);
1901        let metadatas = vec![Some(metadata_1.clone()), None];
1902
1903        let metadata_1 = Arc::new(metadata_1);
1904        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1905            MITO_ENGINE_NAME,
1906            Box::new(move |region_id| {
1907                if region_id == region_id_1 {
1908                    Ok(metadata_1.clone())
1909                } else {
1910                    error::RegionNotFoundSnafu { region_id }.fail()
1911                }
1912            }),
1913        );
1914
1915        mock_region_server.register_engine(engine.clone());
1916        mock_region_server
1917            .inner
1918            .region_map
1919            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1920
1921        // Not in region map.
1922        let list_metadata_request = ListMetadataRequest {
1923            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1924        };
1925        let response = mock_region_server
1926            .handle_list_metadata_request(&list_metadata_request)
1927            .await
1928            .unwrap();
1929        let decoded_metadata: Vec<Option<RegionMetadata>> =
1930            serde_json::from_slice(&response.metadata).unwrap();
1931        assert_eq!(metadatas, decoded_metadata);
1932
1933        // Not in region engine.
1934        mock_region_server
1935            .inner
1936            .region_map
1937            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1938        let response = mock_region_server
1939            .handle_list_metadata_request(&list_metadata_request)
1940            .await
1941            .unwrap();
1942        let decoded_metadata: Vec<Option<RegionMetadata>> =
1943            serde_json::from_slice(&response.metadata).unwrap();
1944        assert_eq!(metadatas, decoded_metadata);
1945    }
1946
1947    #[tokio::test]
1948    async fn test_handle_list_metadata_failed() {
1949        common_telemetry::init_default_ut_logging();
1950
1951        let mut mock_region_server = mock_region_server();
1952        let region_id_1 = RegionId::new(1, 0);
1953
1954        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1955            MITO_ENGINE_NAME,
1956            Box::new(move |region_id| {
1957                error::UnexpectedSnafu {
1958                    violated: format!("Failed to get region {region_id}"),
1959                }
1960                .fail()
1961            }),
1962        );
1963
1964        mock_region_server.register_engine(engine.clone());
1965        mock_region_server
1966            .inner
1967            .region_map
1968            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1969
1970        // Failed to get.
1971        let list_metadata_request = ListMetadataRequest {
1972            region_ids: vec![region_id_1.as_u64()],
1973        };
1974        mock_region_server
1975            .handle_list_metadata_request(&list_metadata_request)
1976            .await
1977            .unwrap_err();
1978    }
1979}