Skip to main content

datanode/
region_server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod catalog;
16mod registrations;
17mod remote_dyn_filter;
18
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::ops::Deref;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::{Arc, RwLock};
25use std::task::{Context, Poll};
26use std::time::Duration;
27
28use api::region::RegionResponse;
29use api::v1::meta::TopicStat;
30use api::v1::region::sync_request::ManifestInfo;
31use api::v1::region::{
32    ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
33};
34use api::v1::{ResponseHeader, Status};
35use arrow_flight::{FlightData, Ticket};
36use async_trait::async_trait;
37use bytes::Bytes;
38use common_error::ext::{BoxedError, ErrorExt};
39use common_error::status_code::StatusCode;
40use common_meta::datanode::TopicStatsReporter;
41use common_query::OutputData;
42use common_query::request::QueryRequest;
43use common_recordbatch::adapter::RecordBatchMetrics;
44use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
45use common_runtime::Runtime;
46use common_telemetry::tracing::{self, info_span};
47use common_telemetry::tracing_context::{FutureExt, TracingContext};
48use common_telemetry::{debug, error, info, warn};
49use dashmap::DashMap;
50use datafusion::datasource::TableProvider;
51use datafusion_common::tree_node::TreeNode;
52use datatypes::schema::SchemaRef;
53use either::Either;
54use futures_util::future::try_join_all;
55use futures_util::{Stream, StreamExt};
56use metric_engine::engine::MetricEngine;
57use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
58use prost::Message;
59use query::QueryEngineRef;
60pub use query::dummy_catalog::{
61    DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
62};
63use query::options::should_collect_region_watermark_from_extensions;
64use serde_json;
65use servers::error::{
66    self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
67};
68use servers::grpc::FlightCompression;
69use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
70use servers::grpc::region_server::RegionServerHandler;
71use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
72use snafu::{OptionExt, ResultExt, ensure};
73use store_api::metric_engine_consts::{
74    FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
75};
76use store_api::region_engine::{
77    RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
78    RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
79    SyncRegionFromRequest,
80};
81use store_api::region_request::{
82    AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
83    RegionOpenRequest, RegionRequest,
84};
85use store_api::storage::RegionId;
86use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
87use tokio::time::timeout;
88use tonic::{Request, Response, Result as TonicResult};
89
90use crate::error::{
91    self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
92    ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
93    ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
94    HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
95    NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
96    Result, RuntimeJoinSnafu, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu,
97    UnsupportedOutputSnafu,
98};
99use crate::event_listener::RegionServerEventListenerRef;
100use crate::query_stream::QueryRuntimeStream;
101use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
102use crate::region_server::registrations::RemoteDynFilterRegistry;
103use crate::region_server::remote_dyn_filter::wrap_remote_dyn_filter_guarded_stream;
104
105const QUERY_RUNTIME_STREAM_BUFFER_SIZE: usize = 8;
106
107#[derive(Clone)]
108pub struct RegionServer {
109    inner: Arc<RegionServerInner>,
110    flight_compression: FlightCompression,
111    suspend: Arc<AtomicBool>,
112}
113
114pub struct RegionStat {
115    pub region_id: RegionId,
116    pub engine: String,
117    pub role: RegionRole,
118}
119
120impl RegionServer {
121    pub fn new(
122        query_engine: QueryEngineRef,
123        runtime: Runtime,
124        event_listener: RegionServerEventListenerRef,
125        flight_compression: FlightCompression,
126    ) -> Self {
127        Self::with_table_provider(
128            query_engine,
129            runtime,
130            event_listener,
131            Arc::new(DummyTableProviderFactory),
132            0,
133            Duration::from_millis(0),
134            flight_compression,
135        )
136    }
137
138    pub fn with_table_provider(
139        query_engine: QueryEngineRef,
140        runtime: Runtime,
141        event_listener: RegionServerEventListenerRef,
142        table_provider_factory: TableProviderFactoryRef,
143        max_concurrent_queries: usize,
144        concurrent_query_limiter_timeout: Duration,
145        flight_compression: FlightCompression,
146    ) -> Self {
147        Self {
148            inner: Arc::new(RegionServerInner::new(
149                query_engine,
150                runtime,
151                event_listener,
152                table_provider_factory,
153                RegionServerParallelism::from_opts(
154                    max_concurrent_queries,
155                    concurrent_query_limiter_timeout,
156                ),
157            )),
158            flight_compression,
159            suspend: Arc::new(AtomicBool::new(false)),
160        }
161    }
162
163    /// Registers an engine.
164    pub fn register_engine(&mut self, engine: RegionEngineRef) {
165        self.inner.register_engine(engine);
166    }
167
168    /// Sets the topic stats.
169    pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
170        self.inner.set_topic_stats_reporter(topic_stats_reporter);
171    }
172
173    /// Finds the region's engine by its id. If the region is not ready, returns `None`.
174    pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
175        match self.inner.get_engine(region_id, &RegionChange::None) {
176            Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
177            Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
178            Err(error::Error::RegionNotFound { .. }) => Ok(None),
179            Err(err) => Err(err),
180        }
181    }
182
183    /// Gets the MitoEngine if it's registered.
184    pub fn mito_engine(&self) -> Option<MitoEngine> {
185        if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
186            Some(mito)
187        } else {
188            self.inner
189                .engines
190                .read()
191                .unwrap()
192                .get(MITO_ENGINE_NAME)
193                .cloned()
194                .and_then(|e| {
195                    let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
196                    if mito.is_none() {
197                        warn!("Mito engine not found in region server engines");
198                    }
199                    mito
200                })
201        }
202    }
203
204    #[tracing::instrument(skip_all)]
205    pub async fn handle_batch_open_requests(
206        &self,
207        parallelism: usize,
208        requests: Vec<(RegionId, RegionOpenRequest)>,
209        ignore_nonexistent_region: bool,
210    ) -> Result<Vec<RegionId>> {
211        self.inner
212            .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
213            .await
214    }
215
216    #[tracing::instrument(skip_all)]
217    pub async fn handle_batch_catchup_requests(
218        &self,
219        parallelism: usize,
220        requests: Vec<(RegionId, RegionCatchupRequest)>,
221    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
222        self.inner
223            .handle_batch_catchup_requests(parallelism, requests)
224            .await
225    }
226
227    #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
228    pub async fn handle_request(
229        &self,
230        region_id: RegionId,
231        request: RegionRequest,
232    ) -> Result<RegionResponse> {
233        if RegionServerInner::is_ingest_request(&request) {
234            let inner = self.inner.clone();
235            let request_type = request.request_type();
236            return common_runtime::spawn_ingest(async move {
237                inner.handle_request(region_id, request).await
238            })
239            .await
240            .context(RuntimeJoinSnafu { request_type })?;
241        }
242
243        self.inner.handle_request(region_id, request).await
244    }
245
246    /// Returns a table provider for the region. Will set snapshot sequence if available in the context.
247    async fn table_provider(
248        &self,
249        region_id: RegionId,
250        ctx: Option<QueryContextRef>,
251    ) -> Result<Arc<dyn TableProvider>> {
252        let status = self
253            .inner
254            .region_map
255            .get(&region_id)
256            .context(RegionNotFoundSnafu { region_id })?
257            .clone();
258        ensure!(
259            matches!(status, RegionEngineWithStatus::Ready(_)),
260            RegionNotReadySnafu { region_id }
261        );
262
263        let provider = self
264            .inner
265            .table_provider_factory
266            .create(region_id, status.into_engine(), ctx)
267            .await
268            .context(ExecuteLogicalPlanSnafu)?;
269
270        Ok(provider)
271    }
272
273    /// Handle reads from remote. They're often query requests received by our Arrow Flight service.
274    pub async fn handle_remote_read(
275        &self,
276        request: api::v1::region::QueryRequest,
277        query_ctx: QueryContextRef,
278    ) -> Result<SendableRecordBatchStream> {
279        self.handle_remote_read_inner(request, query_ctx).await
280    }
281
282    async fn handle_remote_read_inner(
283        &self,
284        request: api::v1::region::QueryRequest,
285        query_ctx: QueryContextRef,
286    ) -> Result<SendableRecordBatchStream> {
287        let permit = if let Some(p) = &self.inner.parallelism {
288            Some(p.acquire().await?)
289        } else {
290            None
291        };
292
293        let region_id = RegionId::from_u64(request.region_id);
294        let catalog_list = Arc::new(NameAwareCatalogList::new(
295            self.clone(),
296            region_id,
297            query_ctx.clone(),
298        ));
299
300        if query_ctx.explain_verbose() {
301            common_telemetry::info!("Handle remote read for region: {}", region_id);
302        }
303
304        let decoder = self
305            .inner
306            .query_engine
307            .engine_context(query_ctx.clone())
308            .new_plan_decoder()
309            .context(NewPlanDecoderSnafu)?;
310
311        let plan = decoder
312            .decode(Bytes::from(request.plan), catalog_list, false)
313            .await
314            .context(DecodeLogicalPlanSnafu)?;
315
316        let cleanup = self.register_initial_remote_dyn_filter_cleanup(&query_ctx, region_id);
317
318        let stream = self
319            .inner
320            .handle_read(
321                QueryRequest {
322                    header: request.header,
323                    region_id,
324                    plan,
325                },
326                query_ctx.clone(),
327            )
328            .await?;
329
330        let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
331        let stream = if let Some(cleanup) = cleanup {
332            wrap_remote_dyn_filter_guarded_stream(stream, cleanup)
333        } else {
334            stream
335        };
336        Ok(maybe_guard_stream(stream, permit))
337    }
338
339    #[tracing::instrument(skip_all)]
340    pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
341        self.handle_read_inner(request).await
342    }
343
344    async fn handle_read_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
345        let permit = if let Some(p) = &self.inner.parallelism {
346            Some(p.acquire().await?)
347        } else {
348            None
349        };
350
351        let ctx = request.header.as_ref().map(|h| h.into());
352        let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
353
354        let region_id = request.region_id;
355        let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
356            .context(DataFusionSnafu)?;
357        let mut injector = injector_builder
358            .build(self, request.region_id, query_ctx.clone())
359            .await?;
360
361        let plan = request
362            .plan
363            .rewrite(&mut injector)
364            .context(DataFusionSnafu)?
365            .data;
366
367        let cleanup = self.register_initial_remote_dyn_filter_cleanup(&query_ctx, region_id);
368
369        let stream = self
370            .inner
371            .handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
372            .await?;
373
374        let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
375        let stream = if let Some(cleanup) = cleanup {
376            wrap_remote_dyn_filter_guarded_stream(stream, cleanup)
377        } else {
378            stream
379        };
380        Ok(maybe_guard_stream(stream, permit))
381    }
382
383    /// Returns all opened and reportable regions.
384    ///
385    /// Notes: except all metrics regions.
386    pub fn reportable_regions(&self) -> Vec<RegionStat> {
387        self.inner
388            .region_map
389            .iter()
390            .filter_map(|e| {
391                let region_id = *e.key();
392                // Filters out any regions whose role equals None.
393                e.role(region_id).map(|role| RegionStat {
394                    region_id,
395                    engine: e.value().name().to_string(),
396                    role,
397                })
398            })
399            .collect()
400    }
401
402    /// Returns the reportable topics.
403    pub fn topic_stats(&self) -> Vec<TopicStat> {
404        let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
405        let Some(reporter) = reporter.as_mut() else {
406            return vec![];
407        };
408        reporter
409            .reportable_topics()
410            .into_iter()
411            .map(|stat| TopicStat {
412                topic_name: stat.topic,
413                record_size: stat.record_size,
414                record_num: stat.record_num,
415                latest_entry_id: stat.latest_entry_id,
416            })
417            .collect()
418    }
419
420    pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
421        self.inner.region_map.get(&region_id).and_then(|engine| {
422            engine.role(region_id).map(|role| match role {
423                RegionRole::Follower => false,
424                RegionRole::Leader => true,
425                RegionRole::StagingLeader => true,
426                RegionRole::DowngradingLeader => true,
427            })
428        })
429    }
430
431    pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
432        let engine = self
433            .inner
434            .region_map
435            .get(&region_id)
436            .with_context(|| RegionNotFoundSnafu { region_id })?;
437        engine
438            .set_region_role(region_id, role)
439            .with_context(|_| HandleRegionRequestSnafu { region_id })
440    }
441
442    /// Set region role state gracefully.
443    ///
444    /// For [SettableRegionRoleState::Follower]:
445    /// After the call returns, the engine ensures that
446    /// no **further** write or flush operations will succeed in this region.
447    ///
448    /// For [SettableRegionRoleState::DowngradingLeader]:
449    /// After the call returns, the engine ensures that
450    /// no **further** write operations will succeed in this region.
451    pub async fn set_region_role_state_gracefully(
452        &self,
453        region_id: RegionId,
454        state: SettableRegionRoleState,
455    ) -> Result<SetRegionRoleStateResponse> {
456        match self.inner.region_map.get(&region_id) {
457            Some(engine) => Ok(engine
458                .set_region_role_state_gracefully(region_id, state)
459                .await
460                .with_context(|_| HandleRegionRequestSnafu { region_id })?),
461            None => Ok(SetRegionRoleStateResponse::NotFound),
462        }
463    }
464
465    pub fn runtime(&self) -> Runtime {
466        self.inner.runtime.clone()
467    }
468
469    pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
470        match self.inner.region_map.get(&region_id) {
471            Some(e) => e.region_statistic(region_id),
472            None => None,
473        }
474    }
475
476    /// Stop the region server.
477    pub async fn stop(&self) -> Result<()> {
478        self.inner.stop().await
479    }
480
481    #[cfg(test)]
482    /// Registers a region for test purpose.
483    pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
484        {
485            let mut engines = self.inner.engines.write().unwrap();
486            if !engines.contains_key(engine.name()) {
487                debug!("Registering test engine: {}", engine.name());
488                engines.insert(engine.name().to_string(), engine.clone());
489            }
490        }
491
492        self.inner
493            .region_map
494            .insert(region_id, RegionEngineWithStatus::Ready(engine));
495    }
496
497    async fn handle_batch_ddl_requests(
498        &self,
499        request: region_request::Body,
500    ) -> Result<RegionResponse> {
501        // Safety: we have already checked the request type in `RegionServer::handle()`.
502        let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
503            .context(BuildRegionRequestsSnafu)?
504            .unwrap();
505        let tracing_context = TracingContext::from_current_span();
506
507        let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
508        self.inner
509            .handle_batch_request(batch_request)
510            .trace(span)
511            .await
512    }
513
514    async fn handle_requests_in_parallel(
515        &self,
516        request: region_request::Body,
517    ) -> Result<RegionResponse> {
518        let requests =
519            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
520
521        // Try to optimize batch Put requests for metric engine
522        // Returns either Some(response) or None(requests_back)
523        match self.try_handle_metric_batch_puts(requests).await? {
524            Either::Left(response) => Ok(response),
525            Either::Right(requests) => {
526                // Fallback: original parallel processing
527                let tracing_context = TracingContext::from_current_span();
528                let join_tasks =
529                    requests
530                        .into_iter()
531                        .map(|(region_id, req): (RegionId, RegionRequest)| {
532                            let self_to_move = self;
533                            let span = tracing_context.attach(info_span!(
534                                "RegionServer::handle_region_request",
535                                region_id = region_id.to_string()
536                            ));
537                            async move {
538                                self_to_move
539                                    .handle_request(region_id, req)
540                                    .trace(span)
541                                    .await
542                            }
543                        });
544
545                let results = try_join_all(join_tasks).await?;
546                let mut affected_rows = 0;
547                let mut extensions = HashMap::new();
548                for result in results {
549                    affected_rows += result.affected_rows;
550                    extensions.extend(result.extensions);
551                }
552
553                Ok(RegionResponse {
554                    affected_rows,
555                    extensions,
556                    metadata: Vec::new(),
557                })
558            }
559        }
560    }
561
562    async fn handle_requests_in_serial(
563        &self,
564        request: region_request::Body,
565    ) -> Result<RegionResponse> {
566        let requests =
567            RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
568        let tracing_context = TracingContext::from_current_span();
569
570        let mut affected_rows = 0;
571        let mut extensions = HashMap::new();
572        for (region_id, req) in requests {
573            let span = tracing_context.attach(info_span!(
574                "RegionServer::handle_region_request",
575                region_id = region_id.to_string()
576            ));
577            let result = self.handle_request(region_id, req).trace(span).await?;
578
579            affected_rows += result.affected_rows;
580            extensions.extend(result.extensions);
581        }
582
583        Ok(RegionResponse {
584            affected_rows,
585            extensions,
586            metadata: Vec::new(),
587        })
588    }
589
590    /// Attempts to optimize batch Put requests for metric engine.
591    ///
592    /// Returns Either::Left(response) if optimization succeeded,
593    /// or Either::Right(original_requests) to fall back to parallel processing.
594    ///
595    /// This avoids cloning requests when optimization cannot be applied.
596    async fn try_handle_metric_batch_puts(
597        &self,
598        requests: Vec<(RegionId, RegionRequest)>,
599    ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
600        if requests.is_empty() {
601            return Ok(Either::Right(requests));
602        }
603
604        // Quick check: verify first request is Put and is metric engine
605        if !matches!(requests[0].1, RegionRequest::Put(_)) {
606            return Ok(Either::Right(requests));
607        }
608        let first_region_id = requests[0].0;
609        let request_type = requests[0].1.request_type();
610
611        // SAFETY: If the first request belongs to metric engine, then ALL requests
612        // in this batch are guaranteed to belong to metric engine. This invariant
613        // is maintained by the request batching logic upstream.
614        let engine = match self
615            .inner
616            .get_engine(first_region_id, &RegionChange::None)?
617        {
618            CurrentEngine::Engine(e) => e,
619            _ => return Ok(Either::Right(requests)),
620        };
621
622        if engine.name() != METRIC_ENGINE_NAME {
623            return Ok(Either::Right(requests));
624        }
625
626        // Check if ALL requests are Put (now we know it's worth checking)
627        let mut all_puts = true;
628        for (_, req) in &requests {
629            if !matches!(req, RegionRequest::Put(_)) {
630                all_puts = false;
631                break;
632            }
633        }
634
635        if !all_puts {
636            return Ok(Either::Right(requests));
637        }
638
639        // Now extract Put requests by consuming ownership (zero clone!)
640        let put_requests = requests.into_iter().map(|(region_id, req)| {
641            if let RegionRequest::Put(put) = req {
642                (region_id, put)
643            } else {
644                unreachable!("Already checked all are Put")
645            }
646        });
647
648        // Downcast to MetricEngine and call batch API
649        let metric_engine = engine
650            .as_any()
651            .downcast_ref::<MetricEngine>()
652            .context(UnexpectedSnafu {
653                violated: "Failed to downcast to MetricEngine",
654            })?
655            .clone();
656
657        let tracing_context = TracingContext::from_current_span();
658        let batch_size = put_requests.len();
659        let span = tracing_context.attach(info_span!(
660            "RegionServer::handle_metric_batch_puts",
661            batch_size = batch_size,
662        ));
663        let result = common_runtime::spawn_ingest(async move {
664            metric_engine
665                .put_regions_batch(put_requests)
666                .trace(span)
667                .await
668        })
669        .await
670        .context(RuntimeJoinSnafu { request_type })?
671        .map_err(BoxedError::new)
672        .context(HandleRegionRequestSnafu {
673            region_id: first_region_id,
674        });
675
676        match result {
677            Ok(total_affected) => {
678                crate::metrics::REGION_CHANGED_ROW_COUNT
679                    .with_label_values(&[request_type])
680                    .inc_by(total_affected as u64);
681                Ok(Either::Left(RegionResponse::new(total_affected)))
682            }
683            Err(err) => {
684                crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
685                    .with_label_values(&[request_type])
686                    .inc_by(batch_size as u64);
687                Err(err)
688            }
689        }
690    }
691
692    async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
693        let region_id = RegionId::from_u64(request.region_id);
694        let manifest_info = request
695            .manifest_info
696            .context(error::MissingRequiredFieldSnafu {
697                name: "manifest_info",
698            })?;
699
700        let manifest_info = match manifest_info {
701            ManifestInfo::MitoManifestInfo(info) => {
702                RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
703            }
704            ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
705                info.data_manifest_version,
706                0,
707                info.metadata_manifest_version,
708                0,
709            ),
710        };
711
712        let tracing_context = TracingContext::from_current_span();
713        let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
714
715        self.sync_region(
716            region_id,
717            SyncRegionFromRequest::from_manifest(manifest_info),
718        )
719        .trace(span)
720        .await
721        .map(|_| RegionResponse::new(AffectedRows::default()))
722    }
723
724    /// Handles the ListMetadata request and retrieves metadata for specified regions.
725    ///
726    /// Returns the results as a JSON-serialized list in the [RegionResponse]. It serializes
727    /// non-existing regions as `null`.
728    #[tracing::instrument(skip_all)]
729    async fn handle_list_metadata_request(
730        &self,
731        request: &ListMetadataRequest,
732    ) -> Result<RegionResponse> {
733        let mut region_metadatas = Vec::new();
734        // Collect metadata for each region
735        for region_id in &request.region_ids {
736            let region_id = RegionId::from_u64(*region_id);
737            // Get the engine.
738            let Some(engine) = self.find_engine(region_id)? else {
739                region_metadatas.push(None);
740                continue;
741            };
742
743            match engine.get_metadata(region_id).await {
744                Ok(metadata) => region_metadatas.push(Some(metadata)),
745                Err(err) => {
746                    if err.status_code() == StatusCode::RegionNotFound {
747                        region_metadatas.push(None);
748                    } else {
749                        Err(err).with_context(|_| GetRegionMetadataSnafu {
750                            engine: engine.name(),
751                            region_id,
752                        })?;
753                    }
754                }
755            }
756        }
757
758        // Serialize metadata to JSON
759        let json_result = serde_json::to_vec(&region_metadatas).context(SerializeJsonSnafu)?;
760
761        let response = RegionResponse::from_metadata(json_result);
762
763        Ok(response)
764    }
765
766    /// Sync region manifest and registers new opened logical regions.
767    pub async fn sync_region(
768        &self,
769        region_id: RegionId,
770        request: SyncRegionFromRequest,
771    ) -> Result<()> {
772        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
773            CurrentEngine::Engine(engine) => engine,
774            _ => {
775                return UnexpectedSnafu {
776                    violated: "unexpected EarlyReturn engine status for a ready region",
777                }
778                .fail();
779            }
780        };
781
782        self.inner
783            .handle_sync_region(&engine, region_id, request)
784            .await
785    }
786
787    /// Remaps manifests from old regions to new regions.
788    pub async fn remap_manifests(
789        &self,
790        request: RemapManifestsRequest,
791    ) -> Result<RemapManifestsResponse> {
792        let region_id = request.region_id;
793        let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
794            CurrentEngine::Engine(engine) => engine,
795            _ => {
796                return UnexpectedSnafu {
797                    violated: "unexpected EarlyReturn engine status for a ready region",
798                }
799                .fail();
800            }
801        };
802
803        engine
804            .remap_manifests(request)
805            .await
806            .with_context(|_| HandleRegionRequestSnafu { region_id })
807    }
808
809    fn is_suspended(&self) -> bool {
810        self.suspend.load(Ordering::Relaxed)
811    }
812
813    pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
814        self.suspend.clone()
815    }
816}
817
818fn wrap_flow_region_watermark_stream(
819    stream: SendableRecordBatchStream,
820    region_id: RegionId,
821    query_ctx: &QueryContextRef,
822) -> SendableRecordBatchStream {
823    if should_collect_region_watermark_from_extensions(&query_ctx.extensions())
824        && let Some(seq) = query_ctx.get_snapshot(region_id.as_u64())
825    {
826        Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) as SendableRecordBatchStream
827    } else {
828        stream
829    }
830}
831
832/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
833struct RegionWatermarkStream {
834    stream: SendableRecordBatchStream,
835    region_id: u64,
836    snapshot_seq: u64,
837    finished: bool,
838}
839
840impl RegionWatermarkStream {
841    fn new(stream: SendableRecordBatchStream, region_id: RegionId, snapshot_seq: u64) -> Self {
842        Self {
843            stream,
844            region_id: region_id.as_u64(),
845            snapshot_seq,
846            finished: false,
847        }
848    }
849
850    fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
851        if metrics
852            .region_watermarks
853            .iter()
854            .any(|entry| entry.region_id == self.region_id)
855        {
856            return metrics;
857        }
858
859        metrics
860            .region_watermarks
861            .push(common_recordbatch::adapter::RegionWatermarkEntry {
862                region_id: self.region_id,
863                watermark: Some(self.snapshot_seq),
864            });
865        metrics
866    }
867}
868
869impl RecordBatchStream for RegionWatermarkStream {
870    fn name(&self) -> &str {
871        self.stream.name()
872    }
873
874    fn schema(&self) -> datatypes::schema::SchemaRef {
875        self.stream.schema()
876    }
877
878    fn output_ordering(&self) -> Option<&[OrderOption]> {
879        self.stream.output_ordering()
880    }
881
882    fn metrics(&self) -> Option<RecordBatchMetrics> {
883        let base = self.stream.metrics();
884        if !self.finished {
885            return base;
886        }
887
888        Some(self.merged_metrics(base.unwrap_or_default()))
889    }
890}
891
892impl Stream for RegionWatermarkStream {
893    type Item = common_recordbatch::error::Result<RecordBatch>;
894
895    fn size_hint(&self) -> (usize, Option<usize>) {
896        self.stream.size_hint()
897    }
898
899    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
900        match Pin::new(&mut self.stream).poll_next(cx) {
901            Poll::Ready(None) => {
902                self.finished = true;
903                Poll::Ready(None)
904            }
905            other => other,
906        }
907    }
908}
909
910#[async_trait]
911impl RegionServerHandler for RegionServer {
912    async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
913        let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
914            .with_label_values(&[request.as_ref()]);
915        let response = match &request {
916            region_request::Body::Creates(_)
917            | region_request::Body::Drops(_)
918            | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
919            region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
920                self.handle_requests_in_parallel(request).await
921            }
922            region_request::Body::Sync(sync_request) => {
923                self.handle_sync_region_request(sync_request).await
924            }
925            region_request::Body::ListMetadata(list_metadata_request) => {
926                self.handle_list_metadata_request(list_metadata_request)
927                    .await
928            }
929            region_request::Body::RemoteDynFilter(remote_dyn_filter_request) => {
930                self.handle_remote_dyn_filter_request(remote_dyn_filter_request)
931                    .await
932            }
933            _ => self.handle_requests_in_serial(request).await,
934        }
935        .map_err(BoxedError::new)
936        .inspect_err(|_| {
937            failed_requests_cnt.inc();
938        })
939        .context(ExecuteGrpcRequestSnafu)?;
940
941        Ok(RegionResponseV1 {
942            header: Some(ResponseHeader {
943                status: Some(Status {
944                    status_code: StatusCode::Success as _,
945                    ..Default::default()
946                }),
947            }),
948            affected_rows: response.affected_rows as _,
949            extensions: response.extensions,
950            metadata: response.metadata,
951        })
952    }
953}
954
955#[async_trait]
956impl FlightCraft for RegionServer {
957    async fn do_get(
958        &self,
959        request: Request<Ticket>,
960    ) -> TonicResult<Response<TonicStream<FlightData>>> {
961        ensure!(!self.is_suspended(), SuspendedSnafu);
962
963        let ticket = request.into_inner().ticket;
964        let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
965            .context(servers_error::InvalidFlightTicketSnafu)?;
966        let tracing_context = request
967            .header
968            .as_ref()
969            .map(|h| TracingContext::from_w3c(&h.tracing_context))
970            .unwrap_or_default();
971        let query_ctx = request
972            .header
973            .as_ref()
974            .map(|h| Arc::new(QueryContext::from(h)))
975            .unwrap_or(QueryContext::arc());
976
977        let result = self
978            .handle_remote_read(request, query_ctx.clone())
979            .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
980            .await?;
981
982        let stream = Box::pin(FlightRecordBatchStream::new(
983            result,
984            tracing_context,
985            self.flight_compression,
986            query_ctx,
987        ));
988        Ok(Response::new(stream))
989    }
990}
991
992#[derive(Clone)]
993enum RegionEngineWithStatus {
994    // An opening, or creating region.
995    Registering(RegionEngineRef),
996    // A closing, or dropping region.
997    Deregistering(RegionEngineRef),
998    // A ready region.
999    Ready(RegionEngineRef),
1000}
1001
1002impl RegionEngineWithStatus {
1003    /// Returns [RegionEngineRef].
1004    pub fn into_engine(self) -> RegionEngineRef {
1005        match self {
1006            RegionEngineWithStatus::Registering(engine) => engine,
1007            RegionEngineWithStatus::Deregistering(engine) => engine,
1008            RegionEngineWithStatus::Ready(engine) => engine,
1009        }
1010    }
1011}
1012
1013impl Deref for RegionEngineWithStatus {
1014    type Target = RegionEngineRef;
1015
1016    fn deref(&self) -> &Self::Target {
1017        match self {
1018            RegionEngineWithStatus::Registering(engine) => engine,
1019            RegionEngineWithStatus::Deregistering(engine) => engine,
1020            RegionEngineWithStatus::Ready(engine) => engine,
1021        }
1022    }
1023}
1024
1025struct RegionServerInner {
1026    engines: RwLock<HashMap<String, RegionEngineRef>>,
1027    region_map: DashMap<RegionId, RegionEngineWithStatus>,
1028    query_engine: QueryEngineRef,
1029    runtime: Runtime,
1030    event_listener: RegionServerEventListenerRef,
1031    table_provider_factory: TableProviderFactoryRef,
1032    /// The number of queries allowed to be executed at the same time.
1033    /// Act as last line of defense on datanode to prevent query overloading.
1034    parallelism: Option<RegionServerParallelism>,
1035    /// The topic stats reporter.
1036    topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
1037    /// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
1038    /// server with a concrete engine; acceptable for now to fetch Mito-specific
1039    /// info (e.g., list SSTs). Consider a diagnostics trait later.
1040    mito_engine: RwLock<Option<MitoEngine>>,
1041    /// TODO(remote-dyn-filter): Reap this query-scoped placeholder registry on query finish/cancel
1042    /// and later fold it into the real remote dyn filter runtime state lifecycle.
1043    initial_remote_dyn_filter_registrations: RemoteDynFilterRegistry,
1044}
1045
1046struct RegionServerParallelism {
1047    semaphore: Arc<Semaphore>,
1048    timeout: Duration,
1049}
1050
1051impl RegionServerParallelism {
1052    pub fn from_opts(
1053        max_concurrent_queries: usize,
1054        concurrent_query_limiter_timeout: Duration,
1055    ) -> Option<Self> {
1056        if max_concurrent_queries == 0 {
1057            return None;
1058        }
1059        Some(RegionServerParallelism {
1060            semaphore: Arc::new(Semaphore::new(max_concurrent_queries)),
1061            timeout: concurrent_query_limiter_timeout,
1062        })
1063    }
1064
1065    pub async fn acquire(&self) -> Result<OwnedSemaphorePermit> {
1066        timeout(self.timeout, self.semaphore.clone().acquire_owned())
1067            .await
1068            .context(ConcurrentQueryLimiterTimeoutSnafu)?
1069            .context(ConcurrentQueryLimiterClosedSnafu)
1070    }
1071}
1072
1073/// Wraps a record batch stream and holds a concurrency permit until the stream is
1074/// fully consumed (dropped), so `max_concurrent_queries` bounds the number of
1075/// in-flight read streams, not just query planning.
1076struct PermitGuardedStream {
1077    inner: SendableRecordBatchStream,
1078    _permit: OwnedSemaphorePermit,
1079}
1080
1081impl RecordBatchStream for PermitGuardedStream {
1082    fn name(&self) -> &str {
1083        self.inner.name()
1084    }
1085
1086    fn schema(&self) -> SchemaRef {
1087        self.inner.schema()
1088    }
1089
1090    fn output_ordering(&self) -> Option<&[OrderOption]> {
1091        self.inner.output_ordering()
1092    }
1093
1094    fn metrics(&self) -> Option<RecordBatchMetrics> {
1095        self.inner.metrics()
1096    }
1097}
1098
1099impl Stream for PermitGuardedStream {
1100    type Item = common_recordbatch::error::Result<RecordBatch>;
1101
1102    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1103        self.inner.as_mut().poll_next(cx)
1104    }
1105}
1106
1107/// Wraps `stream` so it holds `permit` until fully consumed. Returns `stream`
1108/// unchanged when no permit was acquired (limiter disabled).
1109fn maybe_guard_stream(
1110    stream: SendableRecordBatchStream,
1111    permit: Option<OwnedSemaphorePermit>,
1112) -> SendableRecordBatchStream {
1113    match permit {
1114        Some(permit) => Box::pin(PermitGuardedStream {
1115            inner: stream,
1116            _permit: permit,
1117        }),
1118        None => stream,
1119    }
1120}
1121
1122enum CurrentEngine {
1123    Engine(RegionEngineRef),
1124    EarlyReturn(AffectedRows),
1125}
1126
1127impl Debug for CurrentEngine {
1128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1129        match self {
1130            CurrentEngine::Engine(engine) => f
1131                .debug_struct("CurrentEngine")
1132                .field("engine", &engine.name())
1133                .finish(),
1134            CurrentEngine::EarlyReturn(rows) => f
1135                .debug_struct("CurrentEngine")
1136                .field("return", rows)
1137                .finish(),
1138        }
1139    }
1140}
1141
1142impl RegionServerInner {
1143    pub fn new(
1144        query_engine: QueryEngineRef,
1145        runtime: Runtime,
1146        event_listener: RegionServerEventListenerRef,
1147        table_provider_factory: TableProviderFactoryRef,
1148        parallelism: Option<RegionServerParallelism>,
1149    ) -> Self {
1150        Self {
1151            engines: RwLock::new(HashMap::new()),
1152            region_map: DashMap::new(),
1153            query_engine,
1154            runtime,
1155            event_listener,
1156            table_provider_factory,
1157            parallelism,
1158            topic_stats_reporter: RwLock::new(None),
1159            mito_engine: RwLock::new(None),
1160            initial_remote_dyn_filter_registrations: RemoteDynFilterRegistry::new(),
1161        }
1162    }
1163
1164    pub fn register_engine(&self, engine: RegionEngineRef) {
1165        let engine_name = engine.name();
1166        if engine_name == MITO_ENGINE_NAME
1167            && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
1168        {
1169            *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
1170        }
1171
1172        info!("Region Engine {engine_name} is registered");
1173        self.engines
1174            .write()
1175            .unwrap()
1176            .insert(engine_name.to_string(), engine);
1177    }
1178
1179    pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
1180        info!("Set topic stats reporter");
1181        *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
1182    }
1183
1184    fn is_ingest_request(request: &RegionRequest) -> bool {
1185        matches!(
1186            request,
1187            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_)
1188        )
1189    }
1190
1191    fn get_engine(
1192        &self,
1193        region_id: RegionId,
1194        region_change: &RegionChange,
1195    ) -> Result<CurrentEngine> {
1196        let current_region_status = self.region_map.get(&region_id);
1197
1198        let engine = match region_change {
1199            RegionChange::Register(attribute) => match current_region_status {
1200                Some(status) => match status.clone() {
1201                    RegionEngineWithStatus::Registering(engine) => engine,
1202                    RegionEngineWithStatus::Deregistering(_) => {
1203                        return error::RegionBusySnafu { region_id }.fail();
1204                    }
1205                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1206                },
1207                _ => self
1208                    .engines
1209                    .read()
1210                    .unwrap()
1211                    .get(attribute.engine())
1212                    .with_context(|| RegionEngineNotFoundSnafu {
1213                        name: attribute.engine(),
1214                    })?
1215                    .clone(),
1216            },
1217            RegionChange::Deregisters => match current_region_status {
1218                Some(status) => match status.clone() {
1219                    RegionEngineWithStatus::Registering(_) => {
1220                        return error::RegionBusySnafu { region_id }.fail();
1221                    }
1222                    RegionEngineWithStatus::Deregistering(_) => {
1223                        return Ok(CurrentEngine::EarlyReturn(0));
1224                    }
1225                    RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1226                },
1227                None => return Ok(CurrentEngine::EarlyReturn(0)),
1228            },
1229            RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1230                match current_region_status {
1231                    Some(status) => match status.clone() {
1232                        RegionEngineWithStatus::Registering(_) => {
1233                            return error::RegionNotReadySnafu { region_id }.fail();
1234                        }
1235                        RegionEngineWithStatus::Deregistering(_) => {
1236                            return error::RegionNotFoundSnafu { region_id }.fail();
1237                        }
1238                        RegionEngineWithStatus::Ready(engine) => engine,
1239                    },
1240                    None => return error::RegionNotFoundSnafu { region_id }.fail(),
1241                }
1242            }
1243        };
1244
1245        Ok(CurrentEngine::Engine(engine))
1246    }
1247
1248    async fn handle_batch_open_requests_inner(
1249        &self,
1250        engine: RegionEngineRef,
1251        parallelism: usize,
1252        requests: Vec<(RegionId, RegionOpenRequest)>,
1253        ignore_nonexistent_region: bool,
1254    ) -> Result<Vec<RegionId>> {
1255        let region_changes = requests
1256            .iter()
1257            .map(|(region_id, open)| {
1258                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1259                Ok((*region_id, RegionChange::Register(attribute)))
1260            })
1261            .collect::<Result<HashMap<_, _>>>()?;
1262
1263        for (&region_id, region_change) in &region_changes {
1264            self.set_region_status_not_ready(region_id, &engine, region_change)
1265        }
1266
1267        let mut open_regions = Vec::with_capacity(requests.len());
1268        let mut errors = vec![];
1269        match engine
1270            .handle_batch_open_requests(parallelism, requests)
1271            .await
1272            .with_context(|_| HandleBatchOpenRequestSnafu)
1273        {
1274            Ok(results) => {
1275                for (region_id, result) in results {
1276                    let region_change = &region_changes[&region_id];
1277                    match result {
1278                        Ok(_) => {
1279                            if let Err(e) = self
1280                                .set_region_status_ready(region_id, engine.clone(), *region_change)
1281                                .await
1282                            {
1283                                error!(e; "Failed to set region to ready: {}", region_id);
1284                                errors.push(BoxedError::new(e));
1285                            } else {
1286                                open_regions.push(region_id)
1287                            }
1288                        }
1289                        Err(e) => {
1290                            self.unset_region_status(region_id, &engine, *region_change);
1291                            if e.status_code() == StatusCode::RegionNotFound
1292                                && ignore_nonexistent_region
1293                            {
1294                                warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1295                            } else {
1296                                error!(e; "Failed to open region: {}", region_id);
1297                                errors.push(e);
1298                            }
1299                        }
1300                    }
1301                }
1302            }
1303            Err(e) => {
1304                for (&region_id, region_change) in &region_changes {
1305                    self.unset_region_status(region_id, &engine, *region_change);
1306                }
1307                error!(e; "Failed to open batch regions");
1308                errors.push(BoxedError::new(e));
1309            }
1310        }
1311
1312        if !errors.is_empty() {
1313            return error::UnexpectedSnafu {
1314                // Returns the first error.
1315                violated: format!("Failed to open batch regions: {:?}", errors[0]),
1316            }
1317            .fail();
1318        }
1319
1320        Ok(open_regions)
1321    }
1322
1323    pub async fn handle_batch_open_requests(
1324        &self,
1325        parallelism: usize,
1326        requests: Vec<(RegionId, RegionOpenRequest)>,
1327        ignore_nonexistent_region: bool,
1328    ) -> Result<Vec<RegionId>> {
1329        let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1330            HashMap::with_capacity(requests.len());
1331        for (region_id, request) in requests {
1332            if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1333                requests.push((region_id, request));
1334            } else {
1335                engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1336            }
1337        }
1338
1339        let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1340        for (engine, requests) in engine_grouped_requests {
1341            let engine = self
1342                .engines
1343                .read()
1344                .unwrap()
1345                .get(&engine)
1346                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1347                .clone();
1348            results.push(
1349                self.handle_batch_open_requests_inner(
1350                    engine,
1351                    parallelism,
1352                    requests,
1353                    ignore_nonexistent_region,
1354                )
1355                .await,
1356            )
1357        }
1358
1359        Ok(results
1360            .into_iter()
1361            .collect::<Result<Vec<_>>>()?
1362            .into_iter()
1363            .flatten()
1364            .collect::<Vec<_>>())
1365    }
1366
1367    pub async fn handle_batch_catchup_requests_inner(
1368        &self,
1369        engine: RegionEngineRef,
1370        parallelism: usize,
1371        requests: Vec<(RegionId, RegionCatchupRequest)>,
1372    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1373        for (region_id, _) in &requests {
1374            self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1375        }
1376        let region_ids = requests
1377            .iter()
1378            .map(|(region_id, _)| *region_id)
1379            .collect::<Vec<_>>();
1380        let mut responses = Vec::with_capacity(requests.len());
1381        match engine
1382            .handle_batch_catchup_requests(parallelism, requests)
1383            .await
1384        {
1385            Ok(results) => {
1386                for (region_id, result) in results {
1387                    match result {
1388                        Ok(_) => {
1389                            if let Err(e) = self
1390                                .set_region_status_ready(
1391                                    region_id,
1392                                    engine.clone(),
1393                                    RegionChange::Catchup,
1394                                )
1395                                .await
1396                            {
1397                                error!(e; "Failed to set region to ready: {}", region_id);
1398                                responses.push((region_id, Err(BoxedError::new(e))));
1399                            } else {
1400                                responses.push((region_id, Ok(())));
1401                            }
1402                        }
1403                        Err(e) => {
1404                            self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1405                            error!(e; "Failed to catchup region: {}", region_id);
1406                            responses.push((region_id, Err(e)));
1407                        }
1408                    }
1409                }
1410            }
1411            Err(e) => {
1412                for region_id in region_ids {
1413                    self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1414                }
1415                error!(e; "Failed to catchup batch regions");
1416                return error::UnexpectedSnafu {
1417                    violated: format!("Failed to catchup batch regions: {:?}", e),
1418                }
1419                .fail();
1420            }
1421        }
1422
1423        Ok(responses)
1424    }
1425
1426    pub async fn handle_batch_catchup_requests(
1427        &self,
1428        parallelism: usize,
1429        requests: Vec<(RegionId, RegionCatchupRequest)>,
1430    ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1431        let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1432
1433        let mut responses = Vec::with_capacity(requests.len());
1434        for (region_id, request) in requests {
1435            if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1436                match engine {
1437                    CurrentEngine::Engine(engine) => {
1438                        engine_grouped_requests
1439                            .entry(engine.name().to_string())
1440                            .or_default()
1441                            .push((region_id, request));
1442                    }
1443                    CurrentEngine::EarlyReturn(_) => {
1444                        return error::UnexpectedSnafu {
1445                            violated: format!("Unexpected engine type for region {}", region_id),
1446                        }
1447                        .fail();
1448                    }
1449                }
1450            } else {
1451                responses.push((
1452                    region_id,
1453                    Err(BoxedError::new(
1454                        error::RegionNotFoundSnafu { region_id }.build(),
1455                    )),
1456                ));
1457            }
1458        }
1459
1460        for (engine, requests) in engine_grouped_requests {
1461            let engine = self
1462                .engines
1463                .read()
1464                .unwrap()
1465                .get(&engine)
1466                .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1467                .clone();
1468            responses.extend(
1469                self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1470                    .await?,
1471            );
1472        }
1473
1474        Ok(responses)
1475    }
1476
1477    // Handle requests in batch.
1478    //
1479    // limitation: all create requests must be in the same engine.
1480    pub async fn handle_batch_request(
1481        &self,
1482        batch_request: BatchRegionDdlRequest,
1483    ) -> Result<RegionResponse> {
1484        let region_changes = match &batch_request {
1485            BatchRegionDdlRequest::Create(requests) => requests
1486                .iter()
1487                .map(|(region_id, create)| {
1488                    let attribute = parse_region_attribute(&create.engine, &create.options)?;
1489                    Ok((*region_id, RegionChange::Register(attribute)))
1490                })
1491                .collect::<Result<Vec<_>>>()?,
1492            BatchRegionDdlRequest::Drop(requests) => requests
1493                .iter()
1494                .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1495                .collect::<Vec<_>>(),
1496            BatchRegionDdlRequest::Alter(requests) => requests
1497                .iter()
1498                .map(|(region_id, _)| (*region_id, RegionChange::None))
1499                .collect::<Vec<_>>(),
1500        };
1501
1502        // The ddl procedure will ensure all requests are in the same engine.
1503        // Therefore, we can get the engine from the first request.
1504        let (first_region_id, first_region_change) = region_changes.first().unwrap();
1505        let engine = match self.get_engine(*first_region_id, first_region_change)? {
1506            CurrentEngine::Engine(engine) => engine,
1507            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1508        };
1509
1510        for (region_id, region_change) in region_changes.iter() {
1511            self.set_region_status_not_ready(*region_id, &engine, region_change);
1512        }
1513
1514        let ddl_type = batch_request.request_type();
1515        let result = engine
1516            .handle_batch_ddl_requests(batch_request)
1517            .await
1518            .context(HandleBatchDdlRequestSnafu { ddl_type });
1519
1520        match result {
1521            Ok(result) => {
1522                for (region_id, region_change) in &region_changes {
1523                    self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1524                        .await?;
1525                }
1526
1527                Ok(RegionResponse {
1528                    affected_rows: result.affected_rows,
1529                    extensions: result.extensions,
1530                    metadata: Vec::new(),
1531                })
1532            }
1533            Err(err) => {
1534                for (region_id, region_change) in region_changes {
1535                    self.unset_region_status(region_id, &engine, region_change);
1536                }
1537
1538                Err(err)
1539            }
1540        }
1541    }
1542
1543    pub async fn handle_request(
1544        &self,
1545        region_id: RegionId,
1546        request: RegionRequest,
1547    ) -> Result<RegionResponse> {
1548        let request_type = request.request_type();
1549        let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1550            .with_label_values(&[request_type])
1551            .start_timer();
1552
1553        let region_change = match &request {
1554            RegionRequest::Create(create) => {
1555                let attribute = parse_region_attribute(&create.engine, &create.options)?;
1556                RegionChange::Register(attribute)
1557            }
1558            RegionRequest::Open(open) => {
1559                let attribute = parse_region_attribute(&open.engine, &open.options)?;
1560                RegionChange::Register(attribute)
1561            }
1562            RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1563            RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1564                RegionChange::Ingest
1565            }
1566            RegionRequest::Alter(_)
1567            | RegionRequest::Flush(_)
1568            | RegionRequest::Compact(_)
1569            | RegionRequest::Truncate(_)
1570            | RegionRequest::BuildIndex(_)
1571            | RegionRequest::EnterStaging(_)
1572            | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1573            RegionRequest::Catchup(_) => RegionChange::Catchup,
1574        };
1575
1576        let engine = match self.get_engine(region_id, &region_change)? {
1577            CurrentEngine::Engine(engine) => engine,
1578            CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1579        };
1580
1581        // Sets corresponding region status to registering/deregistering before the operation.
1582        self.set_region_status_not_ready(region_id, &engine, &region_change);
1583
1584        match engine
1585            .handle_request(region_id, request)
1586            .await
1587            .with_context(|_| HandleRegionRequestSnafu { region_id })
1588        {
1589            Ok(result) => {
1590                // Update metrics
1591                if matches!(region_change, RegionChange::Ingest) {
1592                    crate::metrics::REGION_CHANGED_ROW_COUNT
1593                        .with_label_values(&[request_type])
1594                        .inc_by(result.affected_rows as u64);
1595                }
1596                // Sets corresponding region status to ready.
1597                self.set_region_status_ready(region_id, engine.clone(), region_change)
1598                    .await?;
1599
1600                Ok(RegionResponse {
1601                    affected_rows: result.affected_rows,
1602                    extensions: result.extensions,
1603                    metadata: Vec::new(),
1604                })
1605            }
1606            Err(err) => {
1607                if matches!(region_change, RegionChange::Ingest) {
1608                    crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1609                        .with_label_values(&[request_type])
1610                        .inc();
1611                }
1612                // Removes the region status if the operation fails.
1613                self.unset_region_status(region_id, &engine, region_change);
1614                Err(err)
1615            }
1616        }
1617    }
1618
1619    /// Handles the sync region request.
1620    pub async fn handle_sync_region(
1621        &self,
1622        engine: &RegionEngineRef,
1623        region_id: RegionId,
1624        request: SyncRegionFromRequest,
1625    ) -> Result<()> {
1626        let Some(new_opened_regions) = engine
1627            .sync_region(region_id, request)
1628            .await
1629            .with_context(|_| HandleRegionRequestSnafu { region_id })?
1630            .new_opened_logical_region_ids()
1631        else {
1632            return Ok(());
1633        };
1634
1635        for region in &new_opened_regions {
1636            self.region_map
1637                .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1638        }
1639        if !new_opened_regions.is_empty() {
1640            info!(
1641                region_id = %region_id,
1642                logical_region_count = new_opened_regions.len(),
1643                logical_regions = ?new_opened_regions,
1644                "Logical regions are registered"
1645            );
1646        }
1647
1648        Ok(())
1649    }
1650
1651    fn set_region_status_not_ready(
1652        &self,
1653        region_id: RegionId,
1654        engine: &RegionEngineRef,
1655        region_change: &RegionChange,
1656    ) {
1657        match region_change {
1658            RegionChange::Register(_) => {
1659                self.region_map.insert(
1660                    region_id,
1661                    RegionEngineWithStatus::Registering(engine.clone()),
1662                );
1663            }
1664            RegionChange::Deregisters => {
1665                self.region_map.insert(
1666                    region_id,
1667                    RegionEngineWithStatus::Deregistering(engine.clone()),
1668                );
1669            }
1670            _ => {}
1671        }
1672    }
1673
1674    fn unset_region_status(
1675        &self,
1676        region_id: RegionId,
1677        engine: &RegionEngineRef,
1678        region_change: RegionChange,
1679    ) {
1680        match region_change {
1681            RegionChange::None | RegionChange::Ingest => {}
1682            RegionChange::Register(_) => {
1683                self.region_map.remove(&region_id);
1684            }
1685            RegionChange::Deregisters => {
1686                self.region_map
1687                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1688            }
1689            RegionChange::Catchup => {}
1690        }
1691    }
1692
1693    async fn set_region_status_ready(
1694        &self,
1695        region_id: RegionId,
1696        engine: RegionEngineRef,
1697        region_change: RegionChange,
1698    ) -> Result<()> {
1699        let engine_type = engine.name();
1700        match region_change {
1701            RegionChange::None | RegionChange::Ingest => {}
1702            RegionChange::Register(attribute) => {
1703                info!(
1704                    "Region {region_id} is registered to engine {}",
1705                    attribute.engine()
1706                );
1707                self.region_map
1708                    .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1709
1710                match attribute {
1711                    RegionAttribute::Metric { physical } => {
1712                        if physical {
1713                            // Registers the logical regions belong to the physical region (`region_id`).
1714                            self.register_logical_regions(&engine, region_id).await?;
1715                            // We only send the `on_region_registered` event of the physical region.
1716                            self.event_listener.on_region_registered(region_id);
1717                        }
1718                    }
1719                    RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1720                    RegionAttribute::File => {
1721                        // do nothing
1722                    }
1723                }
1724            }
1725            RegionChange::Deregisters => {
1726                info!("Region {region_id} is deregistered from engine {engine_type}");
1727                self.region_map
1728                    .remove(&region_id)
1729                    .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1730                self.event_listener.on_region_deregistered(region_id);
1731            }
1732            RegionChange::Catchup => {
1733                if is_metric_engine(engine.name()) {
1734                    // Registers the logical regions belong to the physical region (`region_id`).
1735                    self.register_logical_regions(&engine, region_id).await?;
1736                }
1737            }
1738        }
1739        Ok(())
1740    }
1741
1742    async fn register_logical_regions(
1743        &self,
1744        engine: &RegionEngineRef,
1745        physical_region_id: RegionId,
1746    ) -> Result<()> {
1747        let metric_engine =
1748            engine
1749                .as_any()
1750                .downcast_ref::<MetricEngine>()
1751                .context(UnexpectedSnafu {
1752                    violated: format!(
1753                        "expecting engine type '{}', actual '{}'",
1754                        METRIC_ENGINE_NAME,
1755                        engine.name(),
1756                    ),
1757                })?;
1758
1759        let logical_regions = metric_engine
1760            .logical_regions(physical_region_id)
1761            .await
1762            .context(FindLogicalRegionsSnafu { physical_region_id })?;
1763
1764        for region in &logical_regions {
1765            self.region_map
1766                .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1767        }
1768        if !logical_regions.is_empty() {
1769            info!(
1770                physical_region_id = %physical_region_id,
1771                logical_region_count = logical_regions.len(),
1772                logical_regions = ?logical_regions,
1773                "Logical regions are registered"
1774            );
1775        }
1776        Ok(())
1777    }
1778
1779    pub async fn handle_read(
1780        self: &Arc<Self>,
1781        request: QueryRequest,
1782        query_ctx: QueryContextRef,
1783    ) -> Result<SendableRecordBatchStream> {
1784        let mut stream = self.handle_read_inner(request, query_ctx).await?;
1785        let schema = stream.schema();
1786        let output_ordering = stream.output_ordering().map(|ordering| ordering.to_vec());
1787
1788        let (sender, receiver) = mpsc::channel(QUERY_RUNTIME_STREAM_BUFFER_SIZE);
1789        let metrics = QueryRuntimeStream::metrics_store();
1790        let producer_metrics = metrics.clone();
1791
1792        let producer_handle = common_runtime::spawn_query(async move {
1793            while let Some(batch) = stream.next().await {
1794                *producer_metrics.write().unwrap() = stream.metrics();
1795                if sender.send(batch).await.is_err() {
1796                    break;
1797                }
1798            }
1799            *producer_metrics.write().unwrap() = stream.metrics();
1800        });
1801
1802        Ok(Box::pin(
1803            QueryRuntimeStream::new(schema, receiver)
1804                .with_output_ordering(output_ordering)
1805                .with_metrics_store(metrics)
1806                .with_producer_handle(producer_handle),
1807        ))
1808    }
1809
1810    async fn handle_read_inner(
1811        &self,
1812        request: QueryRequest,
1813        query_ctx: QueryContextRef,
1814    ) -> Result<SendableRecordBatchStream> {
1815        // TODO(ruihang): add metrics and set trace id
1816
1817        let result = self
1818            .query_engine
1819            .execute(request.plan, query_ctx)
1820            .await
1821            .context(ExecuteLogicalPlanSnafu)?;
1822
1823        match result.data {
1824            OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1825                UnsupportedOutputSnafu { expected: "stream" }.fail()
1826            }
1827            OutputData::Stream(stream) => Ok(stream),
1828        }
1829    }
1830
1831    async fn stop(&self) -> Result<()> {
1832        // Calling async functions while iterating inside the Dashmap could easily cause the Rust
1833        // complains "higher-ranked lifetime error". Rust can't prove some future is legit.
1834        // Possible related issue: https://github.com/rust-lang/rust/issues/102211
1835        //
1836        // The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
1837        // it here, collect the values first then use later separately.
1838
1839        let regions = self
1840            .region_map
1841            .iter()
1842            .map(|x| (*x.key(), x.value().clone()))
1843            .collect::<Vec<_>>();
1844        let num_regions = regions.len();
1845
1846        for (region_id, engine) in regions {
1847            let closed = engine
1848                .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1849                .await;
1850            match closed {
1851                Ok(_) => debug!("Region {region_id} is closed"),
1852                Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1853            }
1854        }
1855        self.region_map.clear();
1856        info!("closed {num_regions} regions");
1857
1858        drop(self.mito_engine.write().unwrap().take());
1859        let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1860        for (engine_name, engine) in engines {
1861            engine
1862                .stop()
1863                .await
1864                .context(StopRegionEngineSnafu { name: &engine_name })?;
1865            info!("Region engine {engine_name} is stopped");
1866        }
1867
1868        Ok(())
1869    }
1870}
1871
1872#[derive(Debug, Clone, Copy)]
1873enum RegionChange {
1874    None,
1875    Register(RegionAttribute),
1876    Deregisters,
1877    Catchup,
1878    Ingest,
1879}
1880
1881fn is_metric_engine(engine: &str) -> bool {
1882    engine == METRIC_ENGINE_NAME
1883}
1884
1885fn parse_region_attribute(
1886    engine: &str,
1887    options: &HashMap<String, String>,
1888) -> Result<RegionAttribute> {
1889    match engine {
1890        MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1891        METRIC_ENGINE_NAME => {
1892            let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1893
1894            Ok(RegionAttribute::Metric { physical })
1895        }
1896        FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1897        _ => error::UnexpectedSnafu {
1898            violated: format!("Unknown engine: {}", engine),
1899        }
1900        .fail(),
1901    }
1902}
1903
1904#[derive(Debug, Clone, Copy)]
1905enum RegionAttribute {
1906    Mito,
1907    Metric { physical: bool },
1908    File,
1909}
1910
1911impl RegionAttribute {
1912    fn engine(&self) -> &'static str {
1913        match self {
1914            RegionAttribute::Mito => MITO_ENGINE_NAME,
1915            RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1916            RegionAttribute::File => FILE_ENGINE_NAME,
1917        }
1918    }
1919}
1920
1921#[cfg(test)]
1922mod tests {
1923
1924    use std::assert_matches;
1925    use std::collections::HashMap;
1926    use std::sync::Arc;
1927
1928    use api::v1::{Rows, SemanticType};
1929    use common_error::ext::ErrorExt;
1930    use common_recordbatch::RecordBatches;
1931    use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
1932    use datatypes::prelude::{ConcreteDataType, VectorRef};
1933    use datatypes::schema::{ColumnSchema, Schema};
1934    use datatypes::vectors::Int32Vector;
1935    use futures_util::StreamExt;
1936    use mito2::test_util::CreateRequestBuilder;
1937    use query::options::FLOW_RETURN_REGION_SEQ;
1938    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1939    use store_api::region_engine::RegionEngine;
1940    use store_api::region_request::{
1941        PathType, RegionCompactRequest, RegionDeleteRequest, RegionDropRequest, RegionOpenRequest,
1942        RegionPutRequest, RegionTruncateRequest,
1943    };
1944    use store_api::storage::RegionId;
1945
1946    use super::*;
1947    use crate::tests::{MockRegionEngine, mock_region_server};
1948
1949    #[test]
1950    fn test_is_ingest_request() {
1951        let rows = || Rows {
1952            schema: Vec::new(),
1953            rows: Vec::new(),
1954        };
1955
1956        assert!(RegionServerInner::is_ingest_request(&RegionRequest::Put(
1957            RegionPutRequest {
1958                rows: rows(),
1959                hint: None,
1960                partition_expr_version: None,
1961            },
1962        )));
1963        assert!(RegionServerInner::is_ingest_request(
1964            &RegionRequest::Delete(RegionDeleteRequest {
1965                rows: rows(),
1966                hint: None,
1967                partition_expr_version: None,
1968            },)
1969        ));
1970        assert!(!RegionServerInner::is_ingest_request(
1971            &RegionRequest::Compact(RegionCompactRequest::default()),
1972        ));
1973    }
1974
1975    fn single_value_stream() -> SendableRecordBatchStream {
1976        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1977            "v",
1978            ConcreteDataType::int32_datatype(),
1979            false,
1980        )]));
1981        let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
1982        let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1983        RecordBatches::try_new(schema, vec![batch])
1984            .unwrap()
1985            .as_stream()
1986    }
1987
1988    #[tokio::test]
1989    async fn test_region_watermark_stream_only_sets_terminal_metrics() {
1990        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1991            "v",
1992            ConcreteDataType::int32_datatype(),
1993            false,
1994        )]));
1995        let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
1996        let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1997        let stream = RecordBatches::try_new(schema, vec![batch])
1998            .unwrap()
1999            .as_stream();
2000
2001        let region_id = RegionId::new(42, 7);
2002        let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
2003        let mut pinned = Box::pin(wrapped);
2004
2005        assert!(pinned.as_ref().get_ref().metrics().is_none());
2006        while pinned.next().await.is_some() {}
2007
2008        let metrics = pinned.as_ref().get_ref().metrics().unwrap();
2009        assert_eq!(
2010            metrics.region_watermarks,
2011            vec![RegionWatermarkEntry {
2012                region_id: region_id.as_u64(),
2013                watermark: Some(99),
2014            }]
2015        );
2016    }
2017
2018    #[test]
2019    fn test_region_watermark_stream_preserves_unproved_watermark() {
2020        let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
2021            "v",
2022            ConcreteDataType::int32_datatype(),
2023            false,
2024        )]));
2025        let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
2026        let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
2027        let stream = RecordBatches::try_new(schema, vec![batch])
2028            .unwrap()
2029            .as_stream();
2030
2031        let region_id = RegionId::new(42, 7);
2032        let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
2033        let metrics = RecordBatchMetrics {
2034            region_watermarks: vec![RegionWatermarkEntry {
2035                region_id: region_id.as_u64(),
2036                watermark: None,
2037            }],
2038            ..Default::default()
2039        };
2040
2041        let merged = wrapped.merged_metrics(metrics);
2042        assert_eq!(
2043            merged.region_watermarks,
2044            vec![RegionWatermarkEntry {
2045                region_id: region_id.as_u64(),
2046                watermark: None,
2047            }]
2048        );
2049    }
2050
2051    #[tokio::test]
2052    async fn test_wrap_flow_region_watermark_stream_adds_terminal_metrics() {
2053        let region_id = RegionId::new(42, 7);
2054        let query_ctx = Arc::new(
2055            QueryContextBuilder::default()
2056                .extensions(HashMap::from([(
2057                    FLOW_RETURN_REGION_SEQ.to_string(),
2058                    "true".to_string(),
2059                )]))
2060                .build(),
2061        );
2062        query_ctx.set_snapshot(region_id.as_u64(), 99);
2063
2064        let wrapped =
2065            wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
2066        let mut pinned = Box::pin(wrapped);
2067        while pinned.next().await.is_some() {}
2068
2069        let metrics = pinned.as_ref().get_ref().metrics().unwrap();
2070        assert_eq!(
2071            metrics.region_watermarks,
2072            vec![RegionWatermarkEntry {
2073                region_id: region_id.as_u64(),
2074                watermark: Some(99),
2075            }]
2076        );
2077    }
2078
2079    #[tokio::test]
2080    async fn test_wrap_flow_region_watermark_stream_skips_without_extension() {
2081        let region_id = RegionId::new(42, 7);
2082        let query_ctx = Arc::new(QueryContextBuilder::default().build());
2083        query_ctx.set_snapshot(region_id.as_u64(), 99);
2084
2085        let wrapped =
2086            wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
2087        let mut pinned = Box::pin(wrapped);
2088        while pinned.next().await.is_some() {}
2089
2090        assert!(pinned.as_ref().get_ref().metrics().is_none());
2091    }
2092
2093    #[tokio::test]
2094    async fn test_wrap_flow_region_watermark_stream_skips_without_snapshot() {
2095        let region_id = RegionId::new(42, 7);
2096        let query_ctx = Arc::new(
2097            QueryContextBuilder::default()
2098                .extensions(HashMap::from([(
2099                    FLOW_RETURN_REGION_SEQ.to_string(),
2100                    "true".to_string(),
2101                )]))
2102                .build(),
2103        );
2104
2105        let wrapped =
2106            wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
2107        let mut pinned = Box::pin(wrapped);
2108        while pinned.next().await.is_some() {}
2109
2110        assert!(pinned.as_ref().get_ref().metrics().is_none());
2111    }
2112
2113    #[tokio::test]
2114    async fn test_region_registering() {
2115        common_telemetry::init_default_ut_logging();
2116
2117        let mut mock_region_server = mock_region_server();
2118        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2119        let engine_name = engine.name();
2120        mock_region_server.register_engine(engine.clone());
2121        let region_id = RegionId::new(1, 1);
2122        let builder = CreateRequestBuilder::new();
2123        let create_req = builder.build();
2124        // Tries to create/open a registering region.
2125        mock_region_server.inner.region_map.insert(
2126            region_id,
2127            RegionEngineWithStatus::Registering(engine.clone()),
2128        );
2129        let response = mock_region_server
2130            .handle_request(region_id, RegionRequest::Create(create_req))
2131            .await
2132            .unwrap();
2133        assert_eq!(response.affected_rows, 0);
2134        let status = mock_region_server
2135            .inner
2136            .region_map
2137            .get(&region_id)
2138            .unwrap()
2139            .clone();
2140        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
2141
2142        mock_region_server.inner.region_map.insert(
2143            region_id,
2144            RegionEngineWithStatus::Registering(engine.clone()),
2145        );
2146        let response = mock_region_server
2147            .handle_request(
2148                region_id,
2149                RegionRequest::Open(RegionOpenRequest {
2150                    engine: engine_name.to_string(),
2151                    table_dir: String::new(),
2152                    path_type: PathType::Bare,
2153                    options: Default::default(),
2154                    skip_wal_replay: false,
2155                    checkpoint: None,
2156                    requirements: Default::default(),
2157                }),
2158            )
2159            .await
2160            .unwrap();
2161        assert_eq!(response.affected_rows, 0);
2162        let status = mock_region_server
2163            .inner
2164            .region_map
2165            .get(&region_id)
2166            .unwrap()
2167            .clone();
2168        assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
2169    }
2170
2171    #[tokio::test]
2172    async fn test_region_deregistering() {
2173        common_telemetry::init_default_ut_logging();
2174
2175        let mut mock_region_server = mock_region_server();
2176        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2177
2178        mock_region_server.register_engine(engine.clone());
2179
2180        let region_id = RegionId::new(1, 1);
2181
2182        // Tries to drop/close a registering region.
2183        mock_region_server.inner.region_map.insert(
2184            region_id,
2185            RegionEngineWithStatus::Deregistering(engine.clone()),
2186        );
2187
2188        let response = mock_region_server
2189            .handle_request(
2190                region_id,
2191                RegionRequest::Drop(RegionDropRequest {
2192                    fast_path: false,
2193                    force: false,
2194                    partial_drop: false,
2195                }),
2196            )
2197            .await
2198            .unwrap();
2199        assert_eq!(response.affected_rows, 0);
2200
2201        let status = mock_region_server
2202            .inner
2203            .region_map
2204            .get(&region_id)
2205            .unwrap()
2206            .clone();
2207        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
2208
2209        mock_region_server.inner.region_map.insert(
2210            region_id,
2211            RegionEngineWithStatus::Deregistering(engine.clone()),
2212        );
2213
2214        let response = mock_region_server
2215            .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
2216            .await
2217            .unwrap();
2218        assert_eq!(response.affected_rows, 0);
2219
2220        let status = mock_region_server
2221            .inner
2222            .region_map
2223            .get(&region_id)
2224            .unwrap()
2225            .clone();
2226        assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
2227    }
2228
2229    #[tokio::test]
2230    async fn test_region_not_ready() {
2231        common_telemetry::init_default_ut_logging();
2232
2233        let mut mock_region_server = mock_region_server();
2234        let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2235
2236        mock_region_server.register_engine(engine.clone());
2237
2238        let region_id = RegionId::new(1, 1);
2239
2240        // Tries to drop/close a registering region.
2241        mock_region_server.inner.region_map.insert(
2242            region_id,
2243            RegionEngineWithStatus::Registering(engine.clone()),
2244        );
2245
2246        let err = mock_region_server
2247            .handle_request(
2248                region_id,
2249                RegionRequest::Truncate(RegionTruncateRequest::All),
2250            )
2251            .await
2252            .unwrap_err();
2253
2254        assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2255    }
2256
2257    #[tokio::test]
2258    async fn test_region_request_failed() {
2259        common_telemetry::init_default_ut_logging();
2260
2261        let mut mock_region_server = mock_region_server();
2262        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
2263            MITO_ENGINE_NAME,
2264            Box::new(|_region_id, _request| {
2265                error::UnexpectedSnafu {
2266                    violated: "test".to_string(),
2267                }
2268                .fail()
2269            }),
2270        );
2271
2272        mock_region_server.register_engine(engine.clone());
2273
2274        let region_id = RegionId::new(1, 1);
2275        let builder = CreateRequestBuilder::new();
2276        let create_req = builder.build();
2277        mock_region_server
2278            .handle_request(region_id, RegionRequest::Create(create_req))
2279            .await
2280            .unwrap_err();
2281
2282        let status = mock_region_server.inner.region_map.get(&region_id);
2283        assert!(status.is_none());
2284
2285        mock_region_server
2286            .inner
2287            .region_map
2288            .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
2289
2290        mock_region_server
2291            .handle_request(
2292                region_id,
2293                RegionRequest::Drop(RegionDropRequest {
2294                    fast_path: false,
2295                    force: false,
2296                    partial_drop: false,
2297                }),
2298            )
2299            .await
2300            .unwrap_err();
2301
2302        let status = mock_region_server.inner.region_map.get(&region_id);
2303        assert!(status.is_some());
2304    }
2305
2306    #[tokio::test]
2307    async fn test_batch_open_region_ignore_nonexistent_regions() {
2308        common_telemetry::init_default_ut_logging();
2309        let mut mock_region_server = mock_region_server();
2310        let (engine, _receiver) = MockRegionEngine::with_mock_fn(
2311            MITO_ENGINE_NAME,
2312            Box::new(|region_id, _request| {
2313                if region_id == RegionId::new(1, 1) {
2314                    error::RegionNotFoundSnafu { region_id }.fail()
2315                } else {
2316                    Ok(0)
2317                }
2318            }),
2319        );
2320        mock_region_server.register_engine(engine.clone());
2321
2322        let region_ids = mock_region_server
2323            .handle_batch_open_requests(
2324                8,
2325                vec![
2326                    (
2327                        RegionId::new(1, 1),
2328                        RegionOpenRequest {
2329                            engine: MITO_ENGINE_NAME.to_string(),
2330                            table_dir: String::new(),
2331                            path_type: PathType::Bare,
2332                            options: Default::default(),
2333                            skip_wal_replay: false,
2334                            checkpoint: None,
2335                            requirements: Default::default(),
2336                        },
2337                    ),
2338                    (
2339                        RegionId::new(1, 2),
2340                        RegionOpenRequest {
2341                            engine: MITO_ENGINE_NAME.to_string(),
2342                            table_dir: String::new(),
2343                            path_type: PathType::Bare,
2344                            options: Default::default(),
2345                            skip_wal_replay: false,
2346                            checkpoint: None,
2347                            requirements: Default::default(),
2348                        },
2349                    ),
2350                ],
2351                true,
2352            )
2353            .await
2354            .unwrap();
2355        assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
2356
2357        let err = mock_region_server
2358            .handle_batch_open_requests(
2359                8,
2360                vec![
2361                    (
2362                        RegionId::new(1, 1),
2363                        RegionOpenRequest {
2364                            engine: MITO_ENGINE_NAME.to_string(),
2365                            table_dir: String::new(),
2366                            path_type: PathType::Bare,
2367                            options: Default::default(),
2368                            skip_wal_replay: false,
2369                            checkpoint: None,
2370                            requirements: Default::default(),
2371                        },
2372                    ),
2373                    (
2374                        RegionId::new(1, 2),
2375                        RegionOpenRequest {
2376                            engine: MITO_ENGINE_NAME.to_string(),
2377                            table_dir: String::new(),
2378                            path_type: PathType::Bare,
2379                            options: Default::default(),
2380                            skip_wal_replay: false,
2381                            checkpoint: None,
2382                            requirements: Default::default(),
2383                        },
2384                    ),
2385                ],
2386                false,
2387            )
2388            .await
2389            .unwrap_err();
2390        assert_eq!(err.status_code(), StatusCode::Unexpected);
2391    }
2392
2393    struct CurrentEngineTest {
2394        region_id: RegionId,
2395        current_region_status: Option<RegionEngineWithStatus>,
2396        region_change: RegionChange,
2397        assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
2398    }
2399
2400    #[tokio::test]
2401    async fn test_current_engine() {
2402        common_telemetry::init_default_ut_logging();
2403
2404        let mut mock_region_server = mock_region_server();
2405        let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
2406        mock_region_server.register_engine(engine.clone());
2407
2408        let region_id = RegionId::new(1024, 1);
2409        let tests = vec![
2410            // RegionChange::None
2411            CurrentEngineTest {
2412                region_id,
2413                current_region_status: None,
2414                region_change: RegionChange::None,
2415                assert: Box::new(|result| {
2416                    let err = result.unwrap_err();
2417                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2418                }),
2419            },
2420            CurrentEngineTest {
2421                region_id,
2422                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2423                region_change: RegionChange::None,
2424                assert: Box::new(|result| {
2425                    let current_engine = result.unwrap();
2426                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2427                }),
2428            },
2429            CurrentEngineTest {
2430                region_id,
2431                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2432                region_change: RegionChange::None,
2433                assert: Box::new(|result| {
2434                    let err = result.unwrap_err();
2435                    assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2436                }),
2437            },
2438            CurrentEngineTest {
2439                region_id,
2440                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2441                region_change: RegionChange::None,
2442                assert: Box::new(|result| {
2443                    let err = result.unwrap_err();
2444                    assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2445                }),
2446            },
2447            // RegionChange::Register
2448            CurrentEngineTest {
2449                region_id,
2450                current_region_status: None,
2451                region_change: RegionChange::Register(RegionAttribute::Mito),
2452                assert: Box::new(|result| {
2453                    let current_engine = result.unwrap();
2454                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2455                }),
2456            },
2457            CurrentEngineTest {
2458                region_id,
2459                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2460                region_change: RegionChange::Register(RegionAttribute::Mito),
2461                assert: Box::new(|result| {
2462                    let current_engine = result.unwrap();
2463                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2464                }),
2465            },
2466            CurrentEngineTest {
2467                region_id,
2468                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2469                region_change: RegionChange::Register(RegionAttribute::Mito),
2470                assert: Box::new(|result| {
2471                    let err = result.unwrap_err();
2472                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2473                }),
2474            },
2475            CurrentEngineTest {
2476                region_id,
2477                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2478                region_change: RegionChange::Register(RegionAttribute::Mito),
2479                assert: Box::new(|result| {
2480                    let current_engine = result.unwrap();
2481                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2482                }),
2483            },
2484            // RegionChange::Deregister
2485            CurrentEngineTest {
2486                region_id,
2487                current_region_status: None,
2488                region_change: RegionChange::Deregisters,
2489                assert: Box::new(|result| {
2490                    let current_engine = result.unwrap();
2491                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2492                }),
2493            },
2494            CurrentEngineTest {
2495                region_id,
2496                current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2497                region_change: RegionChange::Deregisters,
2498                assert: Box::new(|result| {
2499                    let err = result.unwrap_err();
2500                    assert_eq!(err.status_code(), StatusCode::RegionBusy);
2501                }),
2502            },
2503            CurrentEngineTest {
2504                region_id,
2505                current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2506                region_change: RegionChange::Deregisters,
2507                assert: Box::new(|result| {
2508                    let current_engine = result.unwrap();
2509                    assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2510                }),
2511            },
2512            CurrentEngineTest {
2513                region_id,
2514                current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2515                region_change: RegionChange::Deregisters,
2516                assert: Box::new(|result| {
2517                    let current_engine = result.unwrap();
2518                    assert_matches!(current_engine, CurrentEngine::Engine(_));
2519                }),
2520            },
2521        ];
2522
2523        for test in tests {
2524            let CurrentEngineTest {
2525                region_id,
2526                current_region_status,
2527                region_change,
2528                assert,
2529            } = test;
2530
2531            // Sets up
2532            if let Some(status) = current_region_status {
2533                mock_region_server
2534                    .inner
2535                    .region_map
2536                    .insert(region_id, status);
2537            } else {
2538                mock_region_server.inner.region_map.remove(&region_id);
2539            }
2540
2541            let result = mock_region_server
2542                .inner
2543                .get_engine(region_id, &region_change);
2544
2545            assert(result);
2546        }
2547    }
2548
2549    #[tokio::test]
2550    async fn test_region_server_parallelism() {
2551        let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2552        let first_query = p.acquire().await;
2553        assert!(first_query.is_ok());
2554        let second_query = p.acquire().await;
2555        assert!(second_query.is_ok());
2556        let third_query = p.acquire().await;
2557        assert!(third_query.is_err());
2558        let err = third_query.unwrap_err();
2559        assert_eq!(
2560            err.output_msg(),
2561            "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2562        );
2563        drop(first_query);
2564        let forth_query = p.acquire().await;
2565        assert!(forth_query.is_ok());
2566    }
2567
2568    fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2569        let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2570        metadata_builder.push_column_metadata(ColumnMetadata {
2571            column_schema: datatypes::schema::ColumnSchema::new(
2572                "timestamp",
2573                ConcreteDataType::timestamp_nanosecond_datatype(),
2574                false,
2575            ),
2576            semantic_type: SemanticType::Timestamp,
2577            column_id: 0,
2578        });
2579        metadata_builder.push_column_metadata(ColumnMetadata {
2580            column_schema: datatypes::schema::ColumnSchema::new(
2581                "file",
2582                ConcreteDataType::string_datatype(),
2583                true,
2584            ),
2585            semantic_type: SemanticType::Tag,
2586            column_id: 1,
2587        });
2588        metadata_builder.push_column_metadata(ColumnMetadata {
2589            column_schema: datatypes::schema::ColumnSchema::new(
2590                "message",
2591                ConcreteDataType::string_datatype(),
2592                true,
2593            ),
2594            semantic_type: SemanticType::Field,
2595            column_id: 2,
2596        });
2597        metadata_builder.primary_key(vec![1]);
2598        metadata_builder.build().unwrap()
2599    }
2600
2601    #[tokio::test]
2602    async fn test_handle_list_metadata_request() {
2603        common_telemetry::init_default_ut_logging();
2604
2605        let mut mock_region_server = mock_region_server();
2606        let region_id_1 = RegionId::new(1, 0);
2607        let region_id_2 = RegionId::new(2, 0);
2608
2609        let metadata_1 = mock_region_metadata(region_id_1);
2610        let metadata_2 = mock_region_metadata(region_id_2);
2611        let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2612
2613        let metadata_1 = Arc::new(metadata_1);
2614        let metadata_2 = Arc::new(metadata_2);
2615        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2616            MITO_ENGINE_NAME,
2617            Box::new(move |region_id| {
2618                if region_id == region_id_1 {
2619                    Ok(metadata_1.clone())
2620                } else if region_id == region_id_2 {
2621                    Ok(metadata_2.clone())
2622                } else {
2623                    error::RegionNotFoundSnafu { region_id }.fail()
2624                }
2625            }),
2626        );
2627
2628        mock_region_server.register_engine(engine.clone());
2629        mock_region_server
2630            .inner
2631            .region_map
2632            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2633        mock_region_server
2634            .inner
2635            .region_map
2636            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2637
2638        // All regions exist.
2639        let list_metadata_request = ListMetadataRequest {
2640            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2641        };
2642        let response = mock_region_server
2643            .handle_list_metadata_request(&list_metadata_request)
2644            .await
2645            .unwrap();
2646        let decoded_metadata: Vec<Option<RegionMetadata>> =
2647            serde_json::from_slice(&response.metadata).unwrap();
2648        assert_eq!(metadatas, decoded_metadata);
2649    }
2650
2651    #[tokio::test]
2652    async fn test_handle_list_metadata_not_found() {
2653        common_telemetry::init_default_ut_logging();
2654
2655        let mut mock_region_server = mock_region_server();
2656        let region_id_1 = RegionId::new(1, 0);
2657        let region_id_2 = RegionId::new(2, 0);
2658
2659        let metadata_1 = mock_region_metadata(region_id_1);
2660        let metadatas = vec![Some(metadata_1.clone()), None];
2661
2662        let metadata_1 = Arc::new(metadata_1);
2663        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2664            MITO_ENGINE_NAME,
2665            Box::new(move |region_id| {
2666                if region_id == region_id_1 {
2667                    Ok(metadata_1.clone())
2668                } else {
2669                    error::RegionNotFoundSnafu { region_id }.fail()
2670                }
2671            }),
2672        );
2673
2674        mock_region_server.register_engine(engine.clone());
2675        mock_region_server
2676            .inner
2677            .region_map
2678            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2679
2680        // Not in region map.
2681        let list_metadata_request = ListMetadataRequest {
2682            region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2683        };
2684        let response = mock_region_server
2685            .handle_list_metadata_request(&list_metadata_request)
2686            .await
2687            .unwrap();
2688        let decoded_metadata: Vec<Option<RegionMetadata>> =
2689            serde_json::from_slice(&response.metadata).unwrap();
2690        assert_eq!(metadatas, decoded_metadata);
2691
2692        // Not in region engine.
2693        mock_region_server
2694            .inner
2695            .region_map
2696            .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2697        let response = mock_region_server
2698            .handle_list_metadata_request(&list_metadata_request)
2699            .await
2700            .unwrap();
2701        let decoded_metadata: Vec<Option<RegionMetadata>> =
2702            serde_json::from_slice(&response.metadata).unwrap();
2703        assert_eq!(metadatas, decoded_metadata);
2704    }
2705
2706    #[tokio::test]
2707    async fn test_handle_list_metadata_failed() {
2708        common_telemetry::init_default_ut_logging();
2709
2710        let mut mock_region_server = mock_region_server();
2711        let region_id_1 = RegionId::new(1, 0);
2712
2713        let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2714            MITO_ENGINE_NAME,
2715            Box::new(move |region_id| {
2716                error::UnexpectedSnafu {
2717                    violated: format!("Failed to get region {region_id}"),
2718                }
2719                .fail()
2720            }),
2721        );
2722
2723        mock_region_server.register_engine(engine.clone());
2724        mock_region_server
2725            .inner
2726            .region_map
2727            .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2728
2729        // Failed to get.
2730        let list_metadata_request = ListMetadataRequest {
2731            region_ids: vec![region_id_1.as_u64()],
2732        };
2733        mock_region_server
2734            .handle_list_metadata_request(&list_metadata_request)
2735            .await
2736            .unwrap_err();
2737    }
2738}