1mod catalog;
16mod registrations;
17mod remote_dyn_filter;
18
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::ops::Deref;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::{Arc, RwLock};
25use std::task::{Context, Poll};
26use std::time::Duration;
27
28use api::region::RegionResponse;
29use api::v1::meta::TopicStat;
30use api::v1::region::sync_request::ManifestInfo;
31use api::v1::region::{
32 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
33};
34use api::v1::{ResponseHeader, Status};
35use arrow_flight::{FlightData, Ticket};
36use async_trait::async_trait;
37use bytes::Bytes;
38use common_error::ext::{BoxedError, ErrorExt};
39use common_error::status_code::StatusCode;
40use common_meta::datanode::TopicStatsReporter;
41use common_query::OutputData;
42use common_query::request::QueryRequest;
43use common_recordbatch::adapter::RecordBatchMetrics;
44use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
45use common_runtime::Runtime;
46use common_telemetry::tracing::{self, info_span};
47use common_telemetry::tracing_context::{FutureExt, TracingContext};
48use common_telemetry::{debug, error, info, warn};
49use dashmap::DashMap;
50use datafusion::datasource::TableProvider;
51use datafusion_common::tree_node::TreeNode;
52use datatypes::schema::SchemaRef;
53use either::Either;
54use futures_util::future::try_join_all;
55use futures_util::{Stream, StreamExt};
56use metric_engine::engine::MetricEngine;
57use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
58use prost::Message;
59use query::QueryEngineRef;
60pub use query::dummy_catalog::{
61 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
62};
63use query::options::should_collect_region_watermark_from_extensions;
64use serde_json;
65use servers::error::{
66 self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
67};
68use servers::grpc::FlightCompression;
69use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
70use servers::grpc::region_server::RegionServerHandler;
71use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
72use snafu::{OptionExt, ResultExt, ensure};
73use store_api::metric_engine_consts::{
74 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
75};
76use store_api::region_engine::{
77 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
78 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
79 SyncRegionFromRequest,
80};
81use store_api::region_request::{
82 AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
83 RegionOpenRequest, RegionRequest,
84};
85use store_api::storage::RegionId;
86use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
87use tokio::time::timeout;
88use tonic::{Request, Response, Result as TonicResult};
89
90use crate::error::{
91 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
92 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
93 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
94 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
95 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
96 Result, RuntimeJoinSnafu, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu,
97 UnsupportedOutputSnafu,
98};
99use crate::event_listener::RegionServerEventListenerRef;
100use crate::query_stream::QueryRuntimeStream;
101use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
102use crate::region_server::registrations::RemoteDynFilterRegistry;
103use crate::region_server::remote_dyn_filter::wrap_remote_dyn_filter_guarded_stream;
104
105const QUERY_RUNTIME_STREAM_BUFFER_SIZE: usize = 8;
106
107#[derive(Clone)]
108pub struct RegionServer {
109 inner: Arc<RegionServerInner>,
110 flight_compression: FlightCompression,
111 suspend: Arc<AtomicBool>,
112}
113
114pub struct RegionStat {
115 pub region_id: RegionId,
116 pub engine: String,
117 pub role: RegionRole,
118}
119
120impl RegionServer {
121 pub fn new(
122 query_engine: QueryEngineRef,
123 runtime: Runtime,
124 event_listener: RegionServerEventListenerRef,
125 flight_compression: FlightCompression,
126 ) -> Self {
127 Self::with_table_provider(
128 query_engine,
129 runtime,
130 event_listener,
131 Arc::new(DummyTableProviderFactory),
132 0,
133 Duration::from_millis(0),
134 flight_compression,
135 )
136 }
137
138 pub fn with_table_provider(
139 query_engine: QueryEngineRef,
140 runtime: Runtime,
141 event_listener: RegionServerEventListenerRef,
142 table_provider_factory: TableProviderFactoryRef,
143 max_concurrent_queries: usize,
144 concurrent_query_limiter_timeout: Duration,
145 flight_compression: FlightCompression,
146 ) -> Self {
147 Self {
148 inner: Arc::new(RegionServerInner::new(
149 query_engine,
150 runtime,
151 event_listener,
152 table_provider_factory,
153 RegionServerParallelism::from_opts(
154 max_concurrent_queries,
155 concurrent_query_limiter_timeout,
156 ),
157 )),
158 flight_compression,
159 suspend: Arc::new(AtomicBool::new(false)),
160 }
161 }
162
163 pub fn register_engine(&mut self, engine: RegionEngineRef) {
165 self.inner.register_engine(engine);
166 }
167
168 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
170 self.inner.set_topic_stats_reporter(topic_stats_reporter);
171 }
172
173 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
175 match self.inner.get_engine(region_id, &RegionChange::None) {
176 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
177 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
178 Err(error::Error::RegionNotFound { .. }) => Ok(None),
179 Err(err) => Err(err),
180 }
181 }
182
183 pub fn mito_engine(&self) -> Option<MitoEngine> {
185 if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
186 Some(mito)
187 } else {
188 self.inner
189 .engines
190 .read()
191 .unwrap()
192 .get(MITO_ENGINE_NAME)
193 .cloned()
194 .and_then(|e| {
195 let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
196 if mito.is_none() {
197 warn!("Mito engine not found in region server engines");
198 }
199 mito
200 })
201 }
202 }
203
204 #[tracing::instrument(skip_all)]
205 pub async fn handle_batch_open_requests(
206 &self,
207 parallelism: usize,
208 requests: Vec<(RegionId, RegionOpenRequest)>,
209 ignore_nonexistent_region: bool,
210 ) -> Result<Vec<RegionId>> {
211 self.inner
212 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
213 .await
214 }
215
216 #[tracing::instrument(skip_all)]
217 pub async fn handle_batch_catchup_requests(
218 &self,
219 parallelism: usize,
220 requests: Vec<(RegionId, RegionCatchupRequest)>,
221 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
222 self.inner
223 .handle_batch_catchup_requests(parallelism, requests)
224 .await
225 }
226
227 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
228 pub async fn handle_request(
229 &self,
230 region_id: RegionId,
231 request: RegionRequest,
232 ) -> Result<RegionResponse> {
233 if RegionServerInner::is_ingest_request(&request) {
234 let inner = self.inner.clone();
235 let request_type = request.request_type();
236 return common_runtime::spawn_ingest(async move {
237 inner.handle_request(region_id, request).await
238 })
239 .await
240 .context(RuntimeJoinSnafu { request_type })?;
241 }
242
243 self.inner.handle_request(region_id, request).await
244 }
245
246 async fn table_provider(
248 &self,
249 region_id: RegionId,
250 ctx: Option<QueryContextRef>,
251 ) -> Result<Arc<dyn TableProvider>> {
252 let status = self
253 .inner
254 .region_map
255 .get(®ion_id)
256 .context(RegionNotFoundSnafu { region_id })?
257 .clone();
258 ensure!(
259 matches!(status, RegionEngineWithStatus::Ready(_)),
260 RegionNotReadySnafu { region_id }
261 );
262
263 let provider = self
264 .inner
265 .table_provider_factory
266 .create(region_id, status.into_engine(), ctx)
267 .await
268 .context(ExecuteLogicalPlanSnafu)?;
269
270 Ok(provider)
271 }
272
273 pub async fn handle_remote_read(
275 &self,
276 request: api::v1::region::QueryRequest,
277 query_ctx: QueryContextRef,
278 ) -> Result<SendableRecordBatchStream> {
279 self.handle_remote_read_inner(request, query_ctx).await
280 }
281
282 async fn handle_remote_read_inner(
283 &self,
284 request: api::v1::region::QueryRequest,
285 query_ctx: QueryContextRef,
286 ) -> Result<SendableRecordBatchStream> {
287 let permit = if let Some(p) = &self.inner.parallelism {
288 Some(p.acquire().await?)
289 } else {
290 None
291 };
292
293 let region_id = RegionId::from_u64(request.region_id);
294 let catalog_list = Arc::new(NameAwareCatalogList::new(
295 self.clone(),
296 region_id,
297 query_ctx.clone(),
298 ));
299
300 if query_ctx.explain_verbose() {
301 common_telemetry::info!("Handle remote read for region: {}", region_id);
302 }
303
304 let decoder = self
305 .inner
306 .query_engine
307 .engine_context(query_ctx.clone())
308 .new_plan_decoder()
309 .context(NewPlanDecoderSnafu)?;
310
311 let plan = decoder
312 .decode(Bytes::from(request.plan), catalog_list, false)
313 .await
314 .context(DecodeLogicalPlanSnafu)?;
315
316 let cleanup = self.register_initial_remote_dyn_filter_cleanup(&query_ctx, region_id);
317
318 let stream = self
319 .inner
320 .handle_read(
321 QueryRequest {
322 header: request.header,
323 region_id,
324 plan,
325 },
326 query_ctx.clone(),
327 )
328 .await?;
329
330 let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
331 let stream = if let Some(cleanup) = cleanup {
332 wrap_remote_dyn_filter_guarded_stream(stream, cleanup)
333 } else {
334 stream
335 };
336 Ok(maybe_guard_stream(stream, permit))
337 }
338
339 #[tracing::instrument(skip_all)]
340 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
341 self.handle_read_inner(request).await
342 }
343
344 async fn handle_read_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
345 let permit = if let Some(p) = &self.inner.parallelism {
346 Some(p.acquire().await?)
347 } else {
348 None
349 };
350
351 let ctx = request.header.as_ref().map(|h| h.into());
352 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
353
354 let region_id = request.region_id;
355 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
356 .context(DataFusionSnafu)?;
357 let mut injector = injector_builder
358 .build(self, request.region_id, query_ctx.clone())
359 .await?;
360
361 let plan = request
362 .plan
363 .rewrite(&mut injector)
364 .context(DataFusionSnafu)?
365 .data;
366
367 let cleanup = self.register_initial_remote_dyn_filter_cleanup(&query_ctx, region_id);
368
369 let stream = self
370 .inner
371 .handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
372 .await?;
373
374 let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
375 let stream = if let Some(cleanup) = cleanup {
376 wrap_remote_dyn_filter_guarded_stream(stream, cleanup)
377 } else {
378 stream
379 };
380 Ok(maybe_guard_stream(stream, permit))
381 }
382
383 pub fn reportable_regions(&self) -> Vec<RegionStat> {
387 self.inner
388 .region_map
389 .iter()
390 .filter_map(|e| {
391 let region_id = *e.key();
392 e.role(region_id).map(|role| RegionStat {
394 region_id,
395 engine: e.value().name().to_string(),
396 role,
397 })
398 })
399 .collect()
400 }
401
402 pub fn topic_stats(&self) -> Vec<TopicStat> {
404 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
405 let Some(reporter) = reporter.as_mut() else {
406 return vec![];
407 };
408 reporter
409 .reportable_topics()
410 .into_iter()
411 .map(|stat| TopicStat {
412 topic_name: stat.topic,
413 record_size: stat.record_size,
414 record_num: stat.record_num,
415 latest_entry_id: stat.latest_entry_id,
416 })
417 .collect()
418 }
419
420 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
421 self.inner.region_map.get(®ion_id).and_then(|engine| {
422 engine.role(region_id).map(|role| match role {
423 RegionRole::Follower => false,
424 RegionRole::Leader => true,
425 RegionRole::StagingLeader => true,
426 RegionRole::DowngradingLeader => true,
427 })
428 })
429 }
430
431 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
432 let engine = self
433 .inner
434 .region_map
435 .get(®ion_id)
436 .with_context(|| RegionNotFoundSnafu { region_id })?;
437 engine
438 .set_region_role(region_id, role)
439 .with_context(|_| HandleRegionRequestSnafu { region_id })
440 }
441
442 pub async fn set_region_role_state_gracefully(
452 &self,
453 region_id: RegionId,
454 state: SettableRegionRoleState,
455 ) -> Result<SetRegionRoleStateResponse> {
456 match self.inner.region_map.get(®ion_id) {
457 Some(engine) => Ok(engine
458 .set_region_role_state_gracefully(region_id, state)
459 .await
460 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
461 None => Ok(SetRegionRoleStateResponse::NotFound),
462 }
463 }
464
465 pub fn runtime(&self) -> Runtime {
466 self.inner.runtime.clone()
467 }
468
469 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
470 match self.inner.region_map.get(®ion_id) {
471 Some(e) => e.region_statistic(region_id),
472 None => None,
473 }
474 }
475
476 pub async fn stop(&self) -> Result<()> {
478 self.inner.stop().await
479 }
480
481 #[cfg(test)]
482 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
484 {
485 let mut engines = self.inner.engines.write().unwrap();
486 if !engines.contains_key(engine.name()) {
487 debug!("Registering test engine: {}", engine.name());
488 engines.insert(engine.name().to_string(), engine.clone());
489 }
490 }
491
492 self.inner
493 .region_map
494 .insert(region_id, RegionEngineWithStatus::Ready(engine));
495 }
496
497 async fn handle_batch_ddl_requests(
498 &self,
499 request: region_request::Body,
500 ) -> Result<RegionResponse> {
501 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
503 .context(BuildRegionRequestsSnafu)?
504 .unwrap();
505 let tracing_context = TracingContext::from_current_span();
506
507 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
508 self.inner
509 .handle_batch_request(batch_request)
510 .trace(span)
511 .await
512 }
513
514 async fn handle_requests_in_parallel(
515 &self,
516 request: region_request::Body,
517 ) -> Result<RegionResponse> {
518 let requests =
519 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
520
521 match self.try_handle_metric_batch_puts(requests).await? {
524 Either::Left(response) => Ok(response),
525 Either::Right(requests) => {
526 let tracing_context = TracingContext::from_current_span();
528 let join_tasks =
529 requests
530 .into_iter()
531 .map(|(region_id, req): (RegionId, RegionRequest)| {
532 let self_to_move = self;
533 let span = tracing_context.attach(info_span!(
534 "RegionServer::handle_region_request",
535 region_id = region_id.to_string()
536 ));
537 async move {
538 self_to_move
539 .handle_request(region_id, req)
540 .trace(span)
541 .await
542 }
543 });
544
545 let results = try_join_all(join_tasks).await?;
546 let mut affected_rows = 0;
547 let mut extensions = HashMap::new();
548 for result in results {
549 affected_rows += result.affected_rows;
550 extensions.extend(result.extensions);
551 }
552
553 Ok(RegionResponse {
554 affected_rows,
555 extensions,
556 metadata: Vec::new(),
557 })
558 }
559 }
560 }
561
562 async fn handle_requests_in_serial(
563 &self,
564 request: region_request::Body,
565 ) -> Result<RegionResponse> {
566 let requests =
567 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
568 let tracing_context = TracingContext::from_current_span();
569
570 let mut affected_rows = 0;
571 let mut extensions = HashMap::new();
572 for (region_id, req) in requests {
573 let span = tracing_context.attach(info_span!(
574 "RegionServer::handle_region_request",
575 region_id = region_id.to_string()
576 ));
577 let result = self.handle_request(region_id, req).trace(span).await?;
578
579 affected_rows += result.affected_rows;
580 extensions.extend(result.extensions);
581 }
582
583 Ok(RegionResponse {
584 affected_rows,
585 extensions,
586 metadata: Vec::new(),
587 })
588 }
589
590 async fn try_handle_metric_batch_puts(
597 &self,
598 requests: Vec<(RegionId, RegionRequest)>,
599 ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
600 if requests.is_empty() {
601 return Ok(Either::Right(requests));
602 }
603
604 if !matches!(requests[0].1, RegionRequest::Put(_)) {
606 return Ok(Either::Right(requests));
607 }
608 let first_region_id = requests[0].0;
609 let request_type = requests[0].1.request_type();
610
611 let engine = match self
615 .inner
616 .get_engine(first_region_id, &RegionChange::None)?
617 {
618 CurrentEngine::Engine(e) => e,
619 _ => return Ok(Either::Right(requests)),
620 };
621
622 if engine.name() != METRIC_ENGINE_NAME {
623 return Ok(Either::Right(requests));
624 }
625
626 let mut all_puts = true;
628 for (_, req) in &requests {
629 if !matches!(req, RegionRequest::Put(_)) {
630 all_puts = false;
631 break;
632 }
633 }
634
635 if !all_puts {
636 return Ok(Either::Right(requests));
637 }
638
639 let put_requests = requests.into_iter().map(|(region_id, req)| {
641 if let RegionRequest::Put(put) = req {
642 (region_id, put)
643 } else {
644 unreachable!("Already checked all are Put")
645 }
646 });
647
648 let metric_engine = engine
650 .as_any()
651 .downcast_ref::<MetricEngine>()
652 .context(UnexpectedSnafu {
653 violated: "Failed to downcast to MetricEngine",
654 })?
655 .clone();
656
657 let tracing_context = TracingContext::from_current_span();
658 let batch_size = put_requests.len();
659 let span = tracing_context.attach(info_span!(
660 "RegionServer::handle_metric_batch_puts",
661 batch_size = batch_size,
662 ));
663 let result = common_runtime::spawn_ingest(async move {
664 metric_engine
665 .put_regions_batch(put_requests)
666 .trace(span)
667 .await
668 })
669 .await
670 .context(RuntimeJoinSnafu { request_type })?
671 .map_err(BoxedError::new)
672 .context(HandleRegionRequestSnafu {
673 region_id: first_region_id,
674 });
675
676 match result {
677 Ok(total_affected) => {
678 crate::metrics::REGION_CHANGED_ROW_COUNT
679 .with_label_values(&[request_type])
680 .inc_by(total_affected as u64);
681 Ok(Either::Left(RegionResponse::new(total_affected)))
682 }
683 Err(err) => {
684 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
685 .with_label_values(&[request_type])
686 .inc_by(batch_size as u64);
687 Err(err)
688 }
689 }
690 }
691
692 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
693 let region_id = RegionId::from_u64(request.region_id);
694 let manifest_info = request
695 .manifest_info
696 .context(error::MissingRequiredFieldSnafu {
697 name: "manifest_info",
698 })?;
699
700 let manifest_info = match manifest_info {
701 ManifestInfo::MitoManifestInfo(info) => {
702 RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
703 }
704 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
705 info.data_manifest_version,
706 0,
707 info.metadata_manifest_version,
708 0,
709 ),
710 };
711
712 let tracing_context = TracingContext::from_current_span();
713 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
714
715 self.sync_region(
716 region_id,
717 SyncRegionFromRequest::from_manifest(manifest_info),
718 )
719 .trace(span)
720 .await
721 .map(|_| RegionResponse::new(AffectedRows::default()))
722 }
723
724 #[tracing::instrument(skip_all)]
729 async fn handle_list_metadata_request(
730 &self,
731 request: &ListMetadataRequest,
732 ) -> Result<RegionResponse> {
733 let mut region_metadatas = Vec::new();
734 for region_id in &request.region_ids {
736 let region_id = RegionId::from_u64(*region_id);
737 let Some(engine) = self.find_engine(region_id)? else {
739 region_metadatas.push(None);
740 continue;
741 };
742
743 match engine.get_metadata(region_id).await {
744 Ok(metadata) => region_metadatas.push(Some(metadata)),
745 Err(err) => {
746 if err.status_code() == StatusCode::RegionNotFound {
747 region_metadatas.push(None);
748 } else {
749 Err(err).with_context(|_| GetRegionMetadataSnafu {
750 engine: engine.name(),
751 region_id,
752 })?;
753 }
754 }
755 }
756 }
757
758 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
760
761 let response = RegionResponse::from_metadata(json_result);
762
763 Ok(response)
764 }
765
766 pub async fn sync_region(
768 &self,
769 region_id: RegionId,
770 request: SyncRegionFromRequest,
771 ) -> Result<()> {
772 let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
773 CurrentEngine::Engine(engine) => engine,
774 _ => {
775 return UnexpectedSnafu {
776 violated: "unexpected EarlyReturn engine status for a ready region",
777 }
778 .fail();
779 }
780 };
781
782 self.inner
783 .handle_sync_region(&engine, region_id, request)
784 .await
785 }
786
787 pub async fn remap_manifests(
789 &self,
790 request: RemapManifestsRequest,
791 ) -> Result<RemapManifestsResponse> {
792 let region_id = request.region_id;
793 let engine = match self.inner.get_engine(region_id, &RegionChange::None)? {
794 CurrentEngine::Engine(engine) => engine,
795 _ => {
796 return UnexpectedSnafu {
797 violated: "unexpected EarlyReturn engine status for a ready region",
798 }
799 .fail();
800 }
801 };
802
803 engine
804 .remap_manifests(request)
805 .await
806 .with_context(|_| HandleRegionRequestSnafu { region_id })
807 }
808
809 fn is_suspended(&self) -> bool {
810 self.suspend.load(Ordering::Relaxed)
811 }
812
813 pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
814 self.suspend.clone()
815 }
816}
817
818fn wrap_flow_region_watermark_stream(
819 stream: SendableRecordBatchStream,
820 region_id: RegionId,
821 query_ctx: &QueryContextRef,
822) -> SendableRecordBatchStream {
823 if should_collect_region_watermark_from_extensions(&query_ctx.extensions())
824 && let Some(seq) = query_ctx.get_snapshot(region_id.as_u64())
825 {
826 Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) as SendableRecordBatchStream
827 } else {
828 stream
829 }
830}
831
832struct RegionWatermarkStream {
834 stream: SendableRecordBatchStream,
835 region_id: u64,
836 snapshot_seq: u64,
837 finished: bool,
838}
839
840impl RegionWatermarkStream {
841 fn new(stream: SendableRecordBatchStream, region_id: RegionId, snapshot_seq: u64) -> Self {
842 Self {
843 stream,
844 region_id: region_id.as_u64(),
845 snapshot_seq,
846 finished: false,
847 }
848 }
849
850 fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
851 if metrics
852 .region_watermarks
853 .iter()
854 .any(|entry| entry.region_id == self.region_id)
855 {
856 return metrics;
857 }
858
859 metrics
860 .region_watermarks
861 .push(common_recordbatch::adapter::RegionWatermarkEntry {
862 region_id: self.region_id,
863 watermark: Some(self.snapshot_seq),
864 });
865 metrics
866 }
867}
868
869impl RecordBatchStream for RegionWatermarkStream {
870 fn name(&self) -> &str {
871 self.stream.name()
872 }
873
874 fn schema(&self) -> datatypes::schema::SchemaRef {
875 self.stream.schema()
876 }
877
878 fn output_ordering(&self) -> Option<&[OrderOption]> {
879 self.stream.output_ordering()
880 }
881
882 fn metrics(&self) -> Option<RecordBatchMetrics> {
883 let base = self.stream.metrics();
884 if !self.finished {
885 return base;
886 }
887
888 Some(self.merged_metrics(base.unwrap_or_default()))
889 }
890}
891
892impl Stream for RegionWatermarkStream {
893 type Item = common_recordbatch::error::Result<RecordBatch>;
894
895 fn size_hint(&self) -> (usize, Option<usize>) {
896 self.stream.size_hint()
897 }
898
899 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
900 match Pin::new(&mut self.stream).poll_next(cx) {
901 Poll::Ready(None) => {
902 self.finished = true;
903 Poll::Ready(None)
904 }
905 other => other,
906 }
907 }
908}
909
910#[async_trait]
911impl RegionServerHandler for RegionServer {
912 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
913 let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
914 .with_label_values(&[request.as_ref()]);
915 let response = match &request {
916 region_request::Body::Creates(_)
917 | region_request::Body::Drops(_)
918 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
919 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
920 self.handle_requests_in_parallel(request).await
921 }
922 region_request::Body::Sync(sync_request) => {
923 self.handle_sync_region_request(sync_request).await
924 }
925 region_request::Body::ListMetadata(list_metadata_request) => {
926 self.handle_list_metadata_request(list_metadata_request)
927 .await
928 }
929 region_request::Body::RemoteDynFilter(remote_dyn_filter_request) => {
930 self.handle_remote_dyn_filter_request(remote_dyn_filter_request)
931 .await
932 }
933 _ => self.handle_requests_in_serial(request).await,
934 }
935 .map_err(BoxedError::new)
936 .inspect_err(|_| {
937 failed_requests_cnt.inc();
938 })
939 .context(ExecuteGrpcRequestSnafu)?;
940
941 Ok(RegionResponseV1 {
942 header: Some(ResponseHeader {
943 status: Some(Status {
944 status_code: StatusCode::Success as _,
945 ..Default::default()
946 }),
947 }),
948 affected_rows: response.affected_rows as _,
949 extensions: response.extensions,
950 metadata: response.metadata,
951 })
952 }
953}
954
955#[async_trait]
956impl FlightCraft for RegionServer {
957 async fn do_get(
958 &self,
959 request: Request<Ticket>,
960 ) -> TonicResult<Response<TonicStream<FlightData>>> {
961 ensure!(!self.is_suspended(), SuspendedSnafu);
962
963 let ticket = request.into_inner().ticket;
964 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
965 .context(servers_error::InvalidFlightTicketSnafu)?;
966 let tracing_context = request
967 .header
968 .as_ref()
969 .map(|h| TracingContext::from_w3c(&h.tracing_context))
970 .unwrap_or_default();
971 let query_ctx = request
972 .header
973 .as_ref()
974 .map(|h| Arc::new(QueryContext::from(h)))
975 .unwrap_or(QueryContext::arc());
976
977 let result = self
978 .handle_remote_read(request, query_ctx.clone())
979 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
980 .await?;
981
982 let stream = Box::pin(FlightRecordBatchStream::new(
983 result,
984 tracing_context,
985 self.flight_compression,
986 query_ctx,
987 ));
988 Ok(Response::new(stream))
989 }
990}
991
992#[derive(Clone)]
993enum RegionEngineWithStatus {
994 Registering(RegionEngineRef),
996 Deregistering(RegionEngineRef),
998 Ready(RegionEngineRef),
1000}
1001
1002impl RegionEngineWithStatus {
1003 pub fn into_engine(self) -> RegionEngineRef {
1005 match self {
1006 RegionEngineWithStatus::Registering(engine) => engine,
1007 RegionEngineWithStatus::Deregistering(engine) => engine,
1008 RegionEngineWithStatus::Ready(engine) => engine,
1009 }
1010 }
1011}
1012
1013impl Deref for RegionEngineWithStatus {
1014 type Target = RegionEngineRef;
1015
1016 fn deref(&self) -> &Self::Target {
1017 match self {
1018 RegionEngineWithStatus::Registering(engine) => engine,
1019 RegionEngineWithStatus::Deregistering(engine) => engine,
1020 RegionEngineWithStatus::Ready(engine) => engine,
1021 }
1022 }
1023}
1024
1025struct RegionServerInner {
1026 engines: RwLock<HashMap<String, RegionEngineRef>>,
1027 region_map: DashMap<RegionId, RegionEngineWithStatus>,
1028 query_engine: QueryEngineRef,
1029 runtime: Runtime,
1030 event_listener: RegionServerEventListenerRef,
1031 table_provider_factory: TableProviderFactoryRef,
1032 parallelism: Option<RegionServerParallelism>,
1035 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
1037 mito_engine: RwLock<Option<MitoEngine>>,
1041 initial_remote_dyn_filter_registrations: RemoteDynFilterRegistry,
1044}
1045
1046struct RegionServerParallelism {
1047 semaphore: Arc<Semaphore>,
1048 timeout: Duration,
1049}
1050
1051impl RegionServerParallelism {
1052 pub fn from_opts(
1053 max_concurrent_queries: usize,
1054 concurrent_query_limiter_timeout: Duration,
1055 ) -> Option<Self> {
1056 if max_concurrent_queries == 0 {
1057 return None;
1058 }
1059 Some(RegionServerParallelism {
1060 semaphore: Arc::new(Semaphore::new(max_concurrent_queries)),
1061 timeout: concurrent_query_limiter_timeout,
1062 })
1063 }
1064
1065 pub async fn acquire(&self) -> Result<OwnedSemaphorePermit> {
1066 timeout(self.timeout, self.semaphore.clone().acquire_owned())
1067 .await
1068 .context(ConcurrentQueryLimiterTimeoutSnafu)?
1069 .context(ConcurrentQueryLimiterClosedSnafu)
1070 }
1071}
1072
1073struct PermitGuardedStream {
1077 inner: SendableRecordBatchStream,
1078 _permit: OwnedSemaphorePermit,
1079}
1080
1081impl RecordBatchStream for PermitGuardedStream {
1082 fn name(&self) -> &str {
1083 self.inner.name()
1084 }
1085
1086 fn schema(&self) -> SchemaRef {
1087 self.inner.schema()
1088 }
1089
1090 fn output_ordering(&self) -> Option<&[OrderOption]> {
1091 self.inner.output_ordering()
1092 }
1093
1094 fn metrics(&self) -> Option<RecordBatchMetrics> {
1095 self.inner.metrics()
1096 }
1097}
1098
1099impl Stream for PermitGuardedStream {
1100 type Item = common_recordbatch::error::Result<RecordBatch>;
1101
1102 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1103 self.inner.as_mut().poll_next(cx)
1104 }
1105}
1106
1107fn maybe_guard_stream(
1110 stream: SendableRecordBatchStream,
1111 permit: Option<OwnedSemaphorePermit>,
1112) -> SendableRecordBatchStream {
1113 match permit {
1114 Some(permit) => Box::pin(PermitGuardedStream {
1115 inner: stream,
1116 _permit: permit,
1117 }),
1118 None => stream,
1119 }
1120}
1121
1122enum CurrentEngine {
1123 Engine(RegionEngineRef),
1124 EarlyReturn(AffectedRows),
1125}
1126
1127impl Debug for CurrentEngine {
1128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1129 match self {
1130 CurrentEngine::Engine(engine) => f
1131 .debug_struct("CurrentEngine")
1132 .field("engine", &engine.name())
1133 .finish(),
1134 CurrentEngine::EarlyReturn(rows) => f
1135 .debug_struct("CurrentEngine")
1136 .field("return", rows)
1137 .finish(),
1138 }
1139 }
1140}
1141
1142impl RegionServerInner {
1143 pub fn new(
1144 query_engine: QueryEngineRef,
1145 runtime: Runtime,
1146 event_listener: RegionServerEventListenerRef,
1147 table_provider_factory: TableProviderFactoryRef,
1148 parallelism: Option<RegionServerParallelism>,
1149 ) -> Self {
1150 Self {
1151 engines: RwLock::new(HashMap::new()),
1152 region_map: DashMap::new(),
1153 query_engine,
1154 runtime,
1155 event_listener,
1156 table_provider_factory,
1157 parallelism,
1158 topic_stats_reporter: RwLock::new(None),
1159 mito_engine: RwLock::new(None),
1160 initial_remote_dyn_filter_registrations: RemoteDynFilterRegistry::new(),
1161 }
1162 }
1163
1164 pub fn register_engine(&self, engine: RegionEngineRef) {
1165 let engine_name = engine.name();
1166 if engine_name == MITO_ENGINE_NAME
1167 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
1168 {
1169 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
1170 }
1171
1172 info!("Region Engine {engine_name} is registered");
1173 self.engines
1174 .write()
1175 .unwrap()
1176 .insert(engine_name.to_string(), engine);
1177 }
1178
1179 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
1180 info!("Set topic stats reporter");
1181 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
1182 }
1183
1184 fn is_ingest_request(request: &RegionRequest) -> bool {
1185 matches!(
1186 request,
1187 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_)
1188 )
1189 }
1190
1191 fn get_engine(
1192 &self,
1193 region_id: RegionId,
1194 region_change: &RegionChange,
1195 ) -> Result<CurrentEngine> {
1196 let current_region_status = self.region_map.get(®ion_id);
1197
1198 let engine = match region_change {
1199 RegionChange::Register(attribute) => match current_region_status {
1200 Some(status) => match status.clone() {
1201 RegionEngineWithStatus::Registering(engine) => engine,
1202 RegionEngineWithStatus::Deregistering(_) => {
1203 return error::RegionBusySnafu { region_id }.fail();
1204 }
1205 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1206 },
1207 _ => self
1208 .engines
1209 .read()
1210 .unwrap()
1211 .get(attribute.engine())
1212 .with_context(|| RegionEngineNotFoundSnafu {
1213 name: attribute.engine(),
1214 })?
1215 .clone(),
1216 },
1217 RegionChange::Deregisters => match current_region_status {
1218 Some(status) => match status.clone() {
1219 RegionEngineWithStatus::Registering(_) => {
1220 return error::RegionBusySnafu { region_id }.fail();
1221 }
1222 RegionEngineWithStatus::Deregistering(_) => {
1223 return Ok(CurrentEngine::EarlyReturn(0));
1224 }
1225 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1226 },
1227 None => return Ok(CurrentEngine::EarlyReturn(0)),
1228 },
1229 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1230 match current_region_status {
1231 Some(status) => match status.clone() {
1232 RegionEngineWithStatus::Registering(_) => {
1233 return error::RegionNotReadySnafu { region_id }.fail();
1234 }
1235 RegionEngineWithStatus::Deregistering(_) => {
1236 return error::RegionNotFoundSnafu { region_id }.fail();
1237 }
1238 RegionEngineWithStatus::Ready(engine) => engine,
1239 },
1240 None => return error::RegionNotFoundSnafu { region_id }.fail(),
1241 }
1242 }
1243 };
1244
1245 Ok(CurrentEngine::Engine(engine))
1246 }
1247
1248 async fn handle_batch_open_requests_inner(
1249 &self,
1250 engine: RegionEngineRef,
1251 parallelism: usize,
1252 requests: Vec<(RegionId, RegionOpenRequest)>,
1253 ignore_nonexistent_region: bool,
1254 ) -> Result<Vec<RegionId>> {
1255 let region_changes = requests
1256 .iter()
1257 .map(|(region_id, open)| {
1258 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1259 Ok((*region_id, RegionChange::Register(attribute)))
1260 })
1261 .collect::<Result<HashMap<_, _>>>()?;
1262
1263 for (®ion_id, region_change) in ®ion_changes {
1264 self.set_region_status_not_ready(region_id, &engine, region_change)
1265 }
1266
1267 let mut open_regions = Vec::with_capacity(requests.len());
1268 let mut errors = vec![];
1269 match engine
1270 .handle_batch_open_requests(parallelism, requests)
1271 .await
1272 .with_context(|_| HandleBatchOpenRequestSnafu)
1273 {
1274 Ok(results) => {
1275 for (region_id, result) in results {
1276 let region_change = ®ion_changes[®ion_id];
1277 match result {
1278 Ok(_) => {
1279 if let Err(e) = self
1280 .set_region_status_ready(region_id, engine.clone(), *region_change)
1281 .await
1282 {
1283 error!(e; "Failed to set region to ready: {}", region_id);
1284 errors.push(BoxedError::new(e));
1285 } else {
1286 open_regions.push(region_id)
1287 }
1288 }
1289 Err(e) => {
1290 self.unset_region_status(region_id, &engine, *region_change);
1291 if e.status_code() == StatusCode::RegionNotFound
1292 && ignore_nonexistent_region
1293 {
1294 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1295 } else {
1296 error!(e; "Failed to open region: {}", region_id);
1297 errors.push(e);
1298 }
1299 }
1300 }
1301 }
1302 }
1303 Err(e) => {
1304 for (®ion_id, region_change) in ®ion_changes {
1305 self.unset_region_status(region_id, &engine, *region_change);
1306 }
1307 error!(e; "Failed to open batch regions");
1308 errors.push(BoxedError::new(e));
1309 }
1310 }
1311
1312 if !errors.is_empty() {
1313 return error::UnexpectedSnafu {
1314 violated: format!("Failed to open batch regions: {:?}", errors[0]),
1316 }
1317 .fail();
1318 }
1319
1320 Ok(open_regions)
1321 }
1322
1323 pub async fn handle_batch_open_requests(
1324 &self,
1325 parallelism: usize,
1326 requests: Vec<(RegionId, RegionOpenRequest)>,
1327 ignore_nonexistent_region: bool,
1328 ) -> Result<Vec<RegionId>> {
1329 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1330 HashMap::with_capacity(requests.len());
1331 for (region_id, request) in requests {
1332 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1333 requests.push((region_id, request));
1334 } else {
1335 engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1336 }
1337 }
1338
1339 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1340 for (engine, requests) in engine_grouped_requests {
1341 let engine = self
1342 .engines
1343 .read()
1344 .unwrap()
1345 .get(&engine)
1346 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1347 .clone();
1348 results.push(
1349 self.handle_batch_open_requests_inner(
1350 engine,
1351 parallelism,
1352 requests,
1353 ignore_nonexistent_region,
1354 )
1355 .await,
1356 )
1357 }
1358
1359 Ok(results
1360 .into_iter()
1361 .collect::<Result<Vec<_>>>()?
1362 .into_iter()
1363 .flatten()
1364 .collect::<Vec<_>>())
1365 }
1366
1367 pub async fn handle_batch_catchup_requests_inner(
1368 &self,
1369 engine: RegionEngineRef,
1370 parallelism: usize,
1371 requests: Vec<(RegionId, RegionCatchupRequest)>,
1372 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1373 for (region_id, _) in &requests {
1374 self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1375 }
1376 let region_ids = requests
1377 .iter()
1378 .map(|(region_id, _)| *region_id)
1379 .collect::<Vec<_>>();
1380 let mut responses = Vec::with_capacity(requests.len());
1381 match engine
1382 .handle_batch_catchup_requests(parallelism, requests)
1383 .await
1384 {
1385 Ok(results) => {
1386 for (region_id, result) in results {
1387 match result {
1388 Ok(_) => {
1389 if let Err(e) = self
1390 .set_region_status_ready(
1391 region_id,
1392 engine.clone(),
1393 RegionChange::Catchup,
1394 )
1395 .await
1396 {
1397 error!(e; "Failed to set region to ready: {}", region_id);
1398 responses.push((region_id, Err(BoxedError::new(e))));
1399 } else {
1400 responses.push((region_id, Ok(())));
1401 }
1402 }
1403 Err(e) => {
1404 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1405 error!(e; "Failed to catchup region: {}", region_id);
1406 responses.push((region_id, Err(e)));
1407 }
1408 }
1409 }
1410 }
1411 Err(e) => {
1412 for region_id in region_ids {
1413 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1414 }
1415 error!(e; "Failed to catchup batch regions");
1416 return error::UnexpectedSnafu {
1417 violated: format!("Failed to catchup batch regions: {:?}", e),
1418 }
1419 .fail();
1420 }
1421 }
1422
1423 Ok(responses)
1424 }
1425
1426 pub async fn handle_batch_catchup_requests(
1427 &self,
1428 parallelism: usize,
1429 requests: Vec<(RegionId, RegionCatchupRequest)>,
1430 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1431 let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1432
1433 let mut responses = Vec::with_capacity(requests.len());
1434 for (region_id, request) in requests {
1435 if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1436 match engine {
1437 CurrentEngine::Engine(engine) => {
1438 engine_grouped_requests
1439 .entry(engine.name().to_string())
1440 .or_default()
1441 .push((region_id, request));
1442 }
1443 CurrentEngine::EarlyReturn(_) => {
1444 return error::UnexpectedSnafu {
1445 violated: format!("Unexpected engine type for region {}", region_id),
1446 }
1447 .fail();
1448 }
1449 }
1450 } else {
1451 responses.push((
1452 region_id,
1453 Err(BoxedError::new(
1454 error::RegionNotFoundSnafu { region_id }.build(),
1455 )),
1456 ));
1457 }
1458 }
1459
1460 for (engine, requests) in engine_grouped_requests {
1461 let engine = self
1462 .engines
1463 .read()
1464 .unwrap()
1465 .get(&engine)
1466 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1467 .clone();
1468 responses.extend(
1469 self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1470 .await?,
1471 );
1472 }
1473
1474 Ok(responses)
1475 }
1476
1477 pub async fn handle_batch_request(
1481 &self,
1482 batch_request: BatchRegionDdlRequest,
1483 ) -> Result<RegionResponse> {
1484 let region_changes = match &batch_request {
1485 BatchRegionDdlRequest::Create(requests) => requests
1486 .iter()
1487 .map(|(region_id, create)| {
1488 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1489 Ok((*region_id, RegionChange::Register(attribute)))
1490 })
1491 .collect::<Result<Vec<_>>>()?,
1492 BatchRegionDdlRequest::Drop(requests) => requests
1493 .iter()
1494 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1495 .collect::<Vec<_>>(),
1496 BatchRegionDdlRequest::Alter(requests) => requests
1497 .iter()
1498 .map(|(region_id, _)| (*region_id, RegionChange::None))
1499 .collect::<Vec<_>>(),
1500 };
1501
1502 let (first_region_id, first_region_change) = region_changes.first().unwrap();
1505 let engine = match self.get_engine(*first_region_id, first_region_change)? {
1506 CurrentEngine::Engine(engine) => engine,
1507 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1508 };
1509
1510 for (region_id, region_change) in region_changes.iter() {
1511 self.set_region_status_not_ready(*region_id, &engine, region_change);
1512 }
1513
1514 let ddl_type = batch_request.request_type();
1515 let result = engine
1516 .handle_batch_ddl_requests(batch_request)
1517 .await
1518 .context(HandleBatchDdlRequestSnafu { ddl_type });
1519
1520 match result {
1521 Ok(result) => {
1522 for (region_id, region_change) in ®ion_changes {
1523 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1524 .await?;
1525 }
1526
1527 Ok(RegionResponse {
1528 affected_rows: result.affected_rows,
1529 extensions: result.extensions,
1530 metadata: Vec::new(),
1531 })
1532 }
1533 Err(err) => {
1534 for (region_id, region_change) in region_changes {
1535 self.unset_region_status(region_id, &engine, region_change);
1536 }
1537
1538 Err(err)
1539 }
1540 }
1541 }
1542
1543 pub async fn handle_request(
1544 &self,
1545 region_id: RegionId,
1546 request: RegionRequest,
1547 ) -> Result<RegionResponse> {
1548 let request_type = request.request_type();
1549 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1550 .with_label_values(&[request_type])
1551 .start_timer();
1552
1553 let region_change = match &request {
1554 RegionRequest::Create(create) => {
1555 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1556 RegionChange::Register(attribute)
1557 }
1558 RegionRequest::Open(open) => {
1559 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1560 RegionChange::Register(attribute)
1561 }
1562 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1563 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1564 RegionChange::Ingest
1565 }
1566 RegionRequest::Alter(_)
1567 | RegionRequest::Flush(_)
1568 | RegionRequest::Compact(_)
1569 | RegionRequest::Truncate(_)
1570 | RegionRequest::BuildIndex(_)
1571 | RegionRequest::EnterStaging(_)
1572 | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1573 RegionRequest::Catchup(_) => RegionChange::Catchup,
1574 };
1575
1576 let engine = match self.get_engine(region_id, ®ion_change)? {
1577 CurrentEngine::Engine(engine) => engine,
1578 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1579 };
1580
1581 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1583
1584 match engine
1585 .handle_request(region_id, request)
1586 .await
1587 .with_context(|_| HandleRegionRequestSnafu { region_id })
1588 {
1589 Ok(result) => {
1590 if matches!(region_change, RegionChange::Ingest) {
1592 crate::metrics::REGION_CHANGED_ROW_COUNT
1593 .with_label_values(&[request_type])
1594 .inc_by(result.affected_rows as u64);
1595 }
1596 self.set_region_status_ready(region_id, engine.clone(), region_change)
1598 .await?;
1599
1600 Ok(RegionResponse {
1601 affected_rows: result.affected_rows,
1602 extensions: result.extensions,
1603 metadata: Vec::new(),
1604 })
1605 }
1606 Err(err) => {
1607 if matches!(region_change, RegionChange::Ingest) {
1608 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1609 .with_label_values(&[request_type])
1610 .inc();
1611 }
1612 self.unset_region_status(region_id, &engine, region_change);
1614 Err(err)
1615 }
1616 }
1617 }
1618
1619 pub async fn handle_sync_region(
1621 &self,
1622 engine: &RegionEngineRef,
1623 region_id: RegionId,
1624 request: SyncRegionFromRequest,
1625 ) -> Result<()> {
1626 let Some(new_opened_regions) = engine
1627 .sync_region(region_id, request)
1628 .await
1629 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1630 .new_opened_logical_region_ids()
1631 else {
1632 return Ok(());
1633 };
1634
1635 for region in &new_opened_regions {
1636 self.region_map
1637 .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1638 }
1639 if !new_opened_regions.is_empty() {
1640 info!(
1641 region_id = %region_id,
1642 logical_region_count = new_opened_regions.len(),
1643 logical_regions = ?new_opened_regions,
1644 "Logical regions are registered"
1645 );
1646 }
1647
1648 Ok(())
1649 }
1650
1651 fn set_region_status_not_ready(
1652 &self,
1653 region_id: RegionId,
1654 engine: &RegionEngineRef,
1655 region_change: &RegionChange,
1656 ) {
1657 match region_change {
1658 RegionChange::Register(_) => {
1659 self.region_map.insert(
1660 region_id,
1661 RegionEngineWithStatus::Registering(engine.clone()),
1662 );
1663 }
1664 RegionChange::Deregisters => {
1665 self.region_map.insert(
1666 region_id,
1667 RegionEngineWithStatus::Deregistering(engine.clone()),
1668 );
1669 }
1670 _ => {}
1671 }
1672 }
1673
1674 fn unset_region_status(
1675 &self,
1676 region_id: RegionId,
1677 engine: &RegionEngineRef,
1678 region_change: RegionChange,
1679 ) {
1680 match region_change {
1681 RegionChange::None | RegionChange::Ingest => {}
1682 RegionChange::Register(_) => {
1683 self.region_map.remove(®ion_id);
1684 }
1685 RegionChange::Deregisters => {
1686 self.region_map
1687 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1688 }
1689 RegionChange::Catchup => {}
1690 }
1691 }
1692
1693 async fn set_region_status_ready(
1694 &self,
1695 region_id: RegionId,
1696 engine: RegionEngineRef,
1697 region_change: RegionChange,
1698 ) -> Result<()> {
1699 let engine_type = engine.name();
1700 match region_change {
1701 RegionChange::None | RegionChange::Ingest => {}
1702 RegionChange::Register(attribute) => {
1703 info!(
1704 "Region {region_id} is registered to engine {}",
1705 attribute.engine()
1706 );
1707 self.region_map
1708 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1709
1710 match attribute {
1711 RegionAttribute::Metric { physical } => {
1712 if physical {
1713 self.register_logical_regions(&engine, region_id).await?;
1715 self.event_listener.on_region_registered(region_id);
1717 }
1718 }
1719 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1720 RegionAttribute::File => {
1721 }
1723 }
1724 }
1725 RegionChange::Deregisters => {
1726 info!("Region {region_id} is deregistered from engine {engine_type}");
1727 self.region_map
1728 .remove(®ion_id)
1729 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1730 self.event_listener.on_region_deregistered(region_id);
1731 }
1732 RegionChange::Catchup => {
1733 if is_metric_engine(engine.name()) {
1734 self.register_logical_regions(&engine, region_id).await?;
1736 }
1737 }
1738 }
1739 Ok(())
1740 }
1741
1742 async fn register_logical_regions(
1743 &self,
1744 engine: &RegionEngineRef,
1745 physical_region_id: RegionId,
1746 ) -> Result<()> {
1747 let metric_engine =
1748 engine
1749 .as_any()
1750 .downcast_ref::<MetricEngine>()
1751 .context(UnexpectedSnafu {
1752 violated: format!(
1753 "expecting engine type '{}', actual '{}'",
1754 METRIC_ENGINE_NAME,
1755 engine.name(),
1756 ),
1757 })?;
1758
1759 let logical_regions = metric_engine
1760 .logical_regions(physical_region_id)
1761 .await
1762 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1763
1764 for region in &logical_regions {
1765 self.region_map
1766 .insert(*region, RegionEngineWithStatus::Ready(engine.clone()));
1767 }
1768 if !logical_regions.is_empty() {
1769 info!(
1770 physical_region_id = %physical_region_id,
1771 logical_region_count = logical_regions.len(),
1772 logical_regions = ?logical_regions,
1773 "Logical regions are registered"
1774 );
1775 }
1776 Ok(())
1777 }
1778
1779 pub async fn handle_read(
1780 self: &Arc<Self>,
1781 request: QueryRequest,
1782 query_ctx: QueryContextRef,
1783 ) -> Result<SendableRecordBatchStream> {
1784 let mut stream = self.handle_read_inner(request, query_ctx).await?;
1785 let schema = stream.schema();
1786 let output_ordering = stream.output_ordering().map(|ordering| ordering.to_vec());
1787
1788 let (sender, receiver) = mpsc::channel(QUERY_RUNTIME_STREAM_BUFFER_SIZE);
1789 let metrics = QueryRuntimeStream::metrics_store();
1790 let producer_metrics = metrics.clone();
1791
1792 let producer_handle = common_runtime::spawn_query(async move {
1793 while let Some(batch) = stream.next().await {
1794 *producer_metrics.write().unwrap() = stream.metrics();
1795 if sender.send(batch).await.is_err() {
1796 break;
1797 }
1798 }
1799 *producer_metrics.write().unwrap() = stream.metrics();
1800 });
1801
1802 Ok(Box::pin(
1803 QueryRuntimeStream::new(schema, receiver)
1804 .with_output_ordering(output_ordering)
1805 .with_metrics_store(metrics)
1806 .with_producer_handle(producer_handle),
1807 ))
1808 }
1809
1810 async fn handle_read_inner(
1811 &self,
1812 request: QueryRequest,
1813 query_ctx: QueryContextRef,
1814 ) -> Result<SendableRecordBatchStream> {
1815 let result = self
1818 .query_engine
1819 .execute(request.plan, query_ctx)
1820 .await
1821 .context(ExecuteLogicalPlanSnafu)?;
1822
1823 match result.data {
1824 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1825 UnsupportedOutputSnafu { expected: "stream" }.fail()
1826 }
1827 OutputData::Stream(stream) => Ok(stream),
1828 }
1829 }
1830
1831 async fn stop(&self) -> Result<()> {
1832 let regions = self
1840 .region_map
1841 .iter()
1842 .map(|x| (*x.key(), x.value().clone()))
1843 .collect::<Vec<_>>();
1844 let num_regions = regions.len();
1845
1846 for (region_id, engine) in regions {
1847 let closed = engine
1848 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1849 .await;
1850 match closed {
1851 Ok(_) => debug!("Region {region_id} is closed"),
1852 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1853 }
1854 }
1855 self.region_map.clear();
1856 info!("closed {num_regions} regions");
1857
1858 drop(self.mito_engine.write().unwrap().take());
1859 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1860 for (engine_name, engine) in engines {
1861 engine
1862 .stop()
1863 .await
1864 .context(StopRegionEngineSnafu { name: &engine_name })?;
1865 info!("Region engine {engine_name} is stopped");
1866 }
1867
1868 Ok(())
1869 }
1870}
1871
1872#[derive(Debug, Clone, Copy)]
1873enum RegionChange {
1874 None,
1875 Register(RegionAttribute),
1876 Deregisters,
1877 Catchup,
1878 Ingest,
1879}
1880
1881fn is_metric_engine(engine: &str) -> bool {
1882 engine == METRIC_ENGINE_NAME
1883}
1884
1885fn parse_region_attribute(
1886 engine: &str,
1887 options: &HashMap<String, String>,
1888) -> Result<RegionAttribute> {
1889 match engine {
1890 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1891 METRIC_ENGINE_NAME => {
1892 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1893
1894 Ok(RegionAttribute::Metric { physical })
1895 }
1896 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1897 _ => error::UnexpectedSnafu {
1898 violated: format!("Unknown engine: {}", engine),
1899 }
1900 .fail(),
1901 }
1902}
1903
1904#[derive(Debug, Clone, Copy)]
1905enum RegionAttribute {
1906 Mito,
1907 Metric { physical: bool },
1908 File,
1909}
1910
1911impl RegionAttribute {
1912 fn engine(&self) -> &'static str {
1913 match self {
1914 RegionAttribute::Mito => MITO_ENGINE_NAME,
1915 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1916 RegionAttribute::File => FILE_ENGINE_NAME,
1917 }
1918 }
1919}
1920
1921#[cfg(test)]
1922mod tests {
1923
1924 use std::assert_matches;
1925 use std::collections::HashMap;
1926 use std::sync::Arc;
1927
1928 use api::v1::{Rows, SemanticType};
1929 use common_error::ext::ErrorExt;
1930 use common_recordbatch::RecordBatches;
1931 use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
1932 use datatypes::prelude::{ConcreteDataType, VectorRef};
1933 use datatypes::schema::{ColumnSchema, Schema};
1934 use datatypes::vectors::Int32Vector;
1935 use futures_util::StreamExt;
1936 use mito2::test_util::CreateRequestBuilder;
1937 use query::options::FLOW_RETURN_REGION_SEQ;
1938 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1939 use store_api::region_engine::RegionEngine;
1940 use store_api::region_request::{
1941 PathType, RegionCompactRequest, RegionDeleteRequest, RegionDropRequest, RegionOpenRequest,
1942 RegionPutRequest, RegionTruncateRequest,
1943 };
1944 use store_api::storage::RegionId;
1945
1946 use super::*;
1947 use crate::tests::{MockRegionEngine, mock_region_server};
1948
1949 #[test]
1950 fn test_is_ingest_request() {
1951 let rows = || Rows {
1952 schema: Vec::new(),
1953 rows: Vec::new(),
1954 };
1955
1956 assert!(RegionServerInner::is_ingest_request(&RegionRequest::Put(
1957 RegionPutRequest {
1958 rows: rows(),
1959 hint: None,
1960 partition_expr_version: None,
1961 },
1962 )));
1963 assert!(RegionServerInner::is_ingest_request(
1964 &RegionRequest::Delete(RegionDeleteRequest {
1965 rows: rows(),
1966 hint: None,
1967 partition_expr_version: None,
1968 },)
1969 ));
1970 assert!(!RegionServerInner::is_ingest_request(
1971 &RegionRequest::Compact(RegionCompactRequest::default()),
1972 ));
1973 }
1974
1975 fn single_value_stream() -> SendableRecordBatchStream {
1976 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1977 "v",
1978 ConcreteDataType::int32_datatype(),
1979 false,
1980 )]));
1981 let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
1982 let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1983 RecordBatches::try_new(schema, vec![batch])
1984 .unwrap()
1985 .as_stream()
1986 }
1987
1988 #[tokio::test]
1989 async fn test_region_watermark_stream_only_sets_terminal_metrics() {
1990 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
1991 "v",
1992 ConcreteDataType::int32_datatype(),
1993 false,
1994 )]));
1995 let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
1996 let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
1997 let stream = RecordBatches::try_new(schema, vec![batch])
1998 .unwrap()
1999 .as_stream();
2000
2001 let region_id = RegionId::new(42, 7);
2002 let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
2003 let mut pinned = Box::pin(wrapped);
2004
2005 assert!(pinned.as_ref().get_ref().metrics().is_none());
2006 while pinned.next().await.is_some() {}
2007
2008 let metrics = pinned.as_ref().get_ref().metrics().unwrap();
2009 assert_eq!(
2010 metrics.region_watermarks,
2011 vec![RegionWatermarkEntry {
2012 region_id: region_id.as_u64(),
2013 watermark: Some(99),
2014 }]
2015 );
2016 }
2017
2018 #[test]
2019 fn test_region_watermark_stream_preserves_unproved_watermark() {
2020 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
2021 "v",
2022 ConcreteDataType::int32_datatype(),
2023 false,
2024 )]));
2025 let values: VectorRef = Arc::new(Int32Vector::from_slice([1]));
2026 let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
2027 let stream = RecordBatches::try_new(schema, vec![batch])
2028 .unwrap()
2029 .as_stream();
2030
2031 let region_id = RegionId::new(42, 7);
2032 let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
2033 let metrics = RecordBatchMetrics {
2034 region_watermarks: vec![RegionWatermarkEntry {
2035 region_id: region_id.as_u64(),
2036 watermark: None,
2037 }],
2038 ..Default::default()
2039 };
2040
2041 let merged = wrapped.merged_metrics(metrics);
2042 assert_eq!(
2043 merged.region_watermarks,
2044 vec![RegionWatermarkEntry {
2045 region_id: region_id.as_u64(),
2046 watermark: None,
2047 }]
2048 );
2049 }
2050
2051 #[tokio::test]
2052 async fn test_wrap_flow_region_watermark_stream_adds_terminal_metrics() {
2053 let region_id = RegionId::new(42, 7);
2054 let query_ctx = Arc::new(
2055 QueryContextBuilder::default()
2056 .extensions(HashMap::from([(
2057 FLOW_RETURN_REGION_SEQ.to_string(),
2058 "true".to_string(),
2059 )]))
2060 .build(),
2061 );
2062 query_ctx.set_snapshot(region_id.as_u64(), 99);
2063
2064 let wrapped =
2065 wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
2066 let mut pinned = Box::pin(wrapped);
2067 while pinned.next().await.is_some() {}
2068
2069 let metrics = pinned.as_ref().get_ref().metrics().unwrap();
2070 assert_eq!(
2071 metrics.region_watermarks,
2072 vec![RegionWatermarkEntry {
2073 region_id: region_id.as_u64(),
2074 watermark: Some(99),
2075 }]
2076 );
2077 }
2078
2079 #[tokio::test]
2080 async fn test_wrap_flow_region_watermark_stream_skips_without_extension() {
2081 let region_id = RegionId::new(42, 7);
2082 let query_ctx = Arc::new(QueryContextBuilder::default().build());
2083 query_ctx.set_snapshot(region_id.as_u64(), 99);
2084
2085 let wrapped =
2086 wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
2087 let mut pinned = Box::pin(wrapped);
2088 while pinned.next().await.is_some() {}
2089
2090 assert!(pinned.as_ref().get_ref().metrics().is_none());
2091 }
2092
2093 #[tokio::test]
2094 async fn test_wrap_flow_region_watermark_stream_skips_without_snapshot() {
2095 let region_id = RegionId::new(42, 7);
2096 let query_ctx = Arc::new(
2097 QueryContextBuilder::default()
2098 .extensions(HashMap::from([(
2099 FLOW_RETURN_REGION_SEQ.to_string(),
2100 "true".to_string(),
2101 )]))
2102 .build(),
2103 );
2104
2105 let wrapped =
2106 wrap_flow_region_watermark_stream(single_value_stream(), region_id, &query_ctx);
2107 let mut pinned = Box::pin(wrapped);
2108 while pinned.next().await.is_some() {}
2109
2110 assert!(pinned.as_ref().get_ref().metrics().is_none());
2111 }
2112
2113 #[tokio::test]
2114 async fn test_region_registering() {
2115 common_telemetry::init_default_ut_logging();
2116
2117 let mut mock_region_server = mock_region_server();
2118 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2119 let engine_name = engine.name();
2120 mock_region_server.register_engine(engine.clone());
2121 let region_id = RegionId::new(1, 1);
2122 let builder = CreateRequestBuilder::new();
2123 let create_req = builder.build();
2124 mock_region_server.inner.region_map.insert(
2126 region_id,
2127 RegionEngineWithStatus::Registering(engine.clone()),
2128 );
2129 let response = mock_region_server
2130 .handle_request(region_id, RegionRequest::Create(create_req))
2131 .await
2132 .unwrap();
2133 assert_eq!(response.affected_rows, 0);
2134 let status = mock_region_server
2135 .inner
2136 .region_map
2137 .get(®ion_id)
2138 .unwrap()
2139 .clone();
2140 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
2141
2142 mock_region_server.inner.region_map.insert(
2143 region_id,
2144 RegionEngineWithStatus::Registering(engine.clone()),
2145 );
2146 let response = mock_region_server
2147 .handle_request(
2148 region_id,
2149 RegionRequest::Open(RegionOpenRequest {
2150 engine: engine_name.to_string(),
2151 table_dir: String::new(),
2152 path_type: PathType::Bare,
2153 options: Default::default(),
2154 skip_wal_replay: false,
2155 checkpoint: None,
2156 requirements: Default::default(),
2157 }),
2158 )
2159 .await
2160 .unwrap();
2161 assert_eq!(response.affected_rows, 0);
2162 let status = mock_region_server
2163 .inner
2164 .region_map
2165 .get(®ion_id)
2166 .unwrap()
2167 .clone();
2168 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
2169 }
2170
2171 #[tokio::test]
2172 async fn test_region_deregistering() {
2173 common_telemetry::init_default_ut_logging();
2174
2175 let mut mock_region_server = mock_region_server();
2176 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2177
2178 mock_region_server.register_engine(engine.clone());
2179
2180 let region_id = RegionId::new(1, 1);
2181
2182 mock_region_server.inner.region_map.insert(
2184 region_id,
2185 RegionEngineWithStatus::Deregistering(engine.clone()),
2186 );
2187
2188 let response = mock_region_server
2189 .handle_request(
2190 region_id,
2191 RegionRequest::Drop(RegionDropRequest {
2192 fast_path: false,
2193 force: false,
2194 partial_drop: false,
2195 }),
2196 )
2197 .await
2198 .unwrap();
2199 assert_eq!(response.affected_rows, 0);
2200
2201 let status = mock_region_server
2202 .inner
2203 .region_map
2204 .get(®ion_id)
2205 .unwrap()
2206 .clone();
2207 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
2208
2209 mock_region_server.inner.region_map.insert(
2210 region_id,
2211 RegionEngineWithStatus::Deregistering(engine.clone()),
2212 );
2213
2214 let response = mock_region_server
2215 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
2216 .await
2217 .unwrap();
2218 assert_eq!(response.affected_rows, 0);
2219
2220 let status = mock_region_server
2221 .inner
2222 .region_map
2223 .get(®ion_id)
2224 .unwrap()
2225 .clone();
2226 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
2227 }
2228
2229 #[tokio::test]
2230 async fn test_region_not_ready() {
2231 common_telemetry::init_default_ut_logging();
2232
2233 let mut mock_region_server = mock_region_server();
2234 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
2235
2236 mock_region_server.register_engine(engine.clone());
2237
2238 let region_id = RegionId::new(1, 1);
2239
2240 mock_region_server.inner.region_map.insert(
2242 region_id,
2243 RegionEngineWithStatus::Registering(engine.clone()),
2244 );
2245
2246 let err = mock_region_server
2247 .handle_request(
2248 region_id,
2249 RegionRequest::Truncate(RegionTruncateRequest::All),
2250 )
2251 .await
2252 .unwrap_err();
2253
2254 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2255 }
2256
2257 #[tokio::test]
2258 async fn test_region_request_failed() {
2259 common_telemetry::init_default_ut_logging();
2260
2261 let mut mock_region_server = mock_region_server();
2262 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
2263 MITO_ENGINE_NAME,
2264 Box::new(|_region_id, _request| {
2265 error::UnexpectedSnafu {
2266 violated: "test".to_string(),
2267 }
2268 .fail()
2269 }),
2270 );
2271
2272 mock_region_server.register_engine(engine.clone());
2273
2274 let region_id = RegionId::new(1, 1);
2275 let builder = CreateRequestBuilder::new();
2276 let create_req = builder.build();
2277 mock_region_server
2278 .handle_request(region_id, RegionRequest::Create(create_req))
2279 .await
2280 .unwrap_err();
2281
2282 let status = mock_region_server.inner.region_map.get(®ion_id);
2283 assert!(status.is_none());
2284
2285 mock_region_server
2286 .inner
2287 .region_map
2288 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
2289
2290 mock_region_server
2291 .handle_request(
2292 region_id,
2293 RegionRequest::Drop(RegionDropRequest {
2294 fast_path: false,
2295 force: false,
2296 partial_drop: false,
2297 }),
2298 )
2299 .await
2300 .unwrap_err();
2301
2302 let status = mock_region_server.inner.region_map.get(®ion_id);
2303 assert!(status.is_some());
2304 }
2305
2306 #[tokio::test]
2307 async fn test_batch_open_region_ignore_nonexistent_regions() {
2308 common_telemetry::init_default_ut_logging();
2309 let mut mock_region_server = mock_region_server();
2310 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
2311 MITO_ENGINE_NAME,
2312 Box::new(|region_id, _request| {
2313 if region_id == RegionId::new(1, 1) {
2314 error::RegionNotFoundSnafu { region_id }.fail()
2315 } else {
2316 Ok(0)
2317 }
2318 }),
2319 );
2320 mock_region_server.register_engine(engine.clone());
2321
2322 let region_ids = mock_region_server
2323 .handle_batch_open_requests(
2324 8,
2325 vec![
2326 (
2327 RegionId::new(1, 1),
2328 RegionOpenRequest {
2329 engine: MITO_ENGINE_NAME.to_string(),
2330 table_dir: String::new(),
2331 path_type: PathType::Bare,
2332 options: Default::default(),
2333 skip_wal_replay: false,
2334 checkpoint: None,
2335 requirements: Default::default(),
2336 },
2337 ),
2338 (
2339 RegionId::new(1, 2),
2340 RegionOpenRequest {
2341 engine: MITO_ENGINE_NAME.to_string(),
2342 table_dir: String::new(),
2343 path_type: PathType::Bare,
2344 options: Default::default(),
2345 skip_wal_replay: false,
2346 checkpoint: None,
2347 requirements: Default::default(),
2348 },
2349 ),
2350 ],
2351 true,
2352 )
2353 .await
2354 .unwrap();
2355 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
2356
2357 let err = mock_region_server
2358 .handle_batch_open_requests(
2359 8,
2360 vec![
2361 (
2362 RegionId::new(1, 1),
2363 RegionOpenRequest {
2364 engine: MITO_ENGINE_NAME.to_string(),
2365 table_dir: String::new(),
2366 path_type: PathType::Bare,
2367 options: Default::default(),
2368 skip_wal_replay: false,
2369 checkpoint: None,
2370 requirements: Default::default(),
2371 },
2372 ),
2373 (
2374 RegionId::new(1, 2),
2375 RegionOpenRequest {
2376 engine: MITO_ENGINE_NAME.to_string(),
2377 table_dir: String::new(),
2378 path_type: PathType::Bare,
2379 options: Default::default(),
2380 skip_wal_replay: false,
2381 checkpoint: None,
2382 requirements: Default::default(),
2383 },
2384 ),
2385 ],
2386 false,
2387 )
2388 .await
2389 .unwrap_err();
2390 assert_eq!(err.status_code(), StatusCode::Unexpected);
2391 }
2392
2393 struct CurrentEngineTest {
2394 region_id: RegionId,
2395 current_region_status: Option<RegionEngineWithStatus>,
2396 region_change: RegionChange,
2397 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
2398 }
2399
2400 #[tokio::test]
2401 async fn test_current_engine() {
2402 common_telemetry::init_default_ut_logging();
2403
2404 let mut mock_region_server = mock_region_server();
2405 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
2406 mock_region_server.register_engine(engine.clone());
2407
2408 let region_id = RegionId::new(1024, 1);
2409 let tests = vec![
2410 CurrentEngineTest {
2412 region_id,
2413 current_region_status: None,
2414 region_change: RegionChange::None,
2415 assert: Box::new(|result| {
2416 let err = result.unwrap_err();
2417 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2418 }),
2419 },
2420 CurrentEngineTest {
2421 region_id,
2422 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2423 region_change: RegionChange::None,
2424 assert: Box::new(|result| {
2425 let current_engine = result.unwrap();
2426 assert_matches!(current_engine, CurrentEngine::Engine(_));
2427 }),
2428 },
2429 CurrentEngineTest {
2430 region_id,
2431 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2432 region_change: RegionChange::None,
2433 assert: Box::new(|result| {
2434 let err = result.unwrap_err();
2435 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
2436 }),
2437 },
2438 CurrentEngineTest {
2439 region_id,
2440 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2441 region_change: RegionChange::None,
2442 assert: Box::new(|result| {
2443 let err = result.unwrap_err();
2444 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2445 }),
2446 },
2447 CurrentEngineTest {
2449 region_id,
2450 current_region_status: None,
2451 region_change: RegionChange::Register(RegionAttribute::Mito),
2452 assert: Box::new(|result| {
2453 let current_engine = result.unwrap();
2454 assert_matches!(current_engine, CurrentEngine::Engine(_));
2455 }),
2456 },
2457 CurrentEngineTest {
2458 region_id,
2459 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2460 region_change: RegionChange::Register(RegionAttribute::Mito),
2461 assert: Box::new(|result| {
2462 let current_engine = result.unwrap();
2463 assert_matches!(current_engine, CurrentEngine::Engine(_));
2464 }),
2465 },
2466 CurrentEngineTest {
2467 region_id,
2468 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2469 region_change: RegionChange::Register(RegionAttribute::Mito),
2470 assert: Box::new(|result| {
2471 let err = result.unwrap_err();
2472 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2473 }),
2474 },
2475 CurrentEngineTest {
2476 region_id,
2477 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2478 region_change: RegionChange::Register(RegionAttribute::Mito),
2479 assert: Box::new(|result| {
2480 let current_engine = result.unwrap();
2481 assert_matches!(current_engine, CurrentEngine::Engine(_));
2482 }),
2483 },
2484 CurrentEngineTest {
2486 region_id,
2487 current_region_status: None,
2488 region_change: RegionChange::Deregisters,
2489 assert: Box::new(|result| {
2490 let current_engine = result.unwrap();
2491 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2492 }),
2493 },
2494 CurrentEngineTest {
2495 region_id,
2496 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2497 region_change: RegionChange::Deregisters,
2498 assert: Box::new(|result| {
2499 let err = result.unwrap_err();
2500 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2501 }),
2502 },
2503 CurrentEngineTest {
2504 region_id,
2505 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2506 region_change: RegionChange::Deregisters,
2507 assert: Box::new(|result| {
2508 let current_engine = result.unwrap();
2509 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2510 }),
2511 },
2512 CurrentEngineTest {
2513 region_id,
2514 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2515 region_change: RegionChange::Deregisters,
2516 assert: Box::new(|result| {
2517 let current_engine = result.unwrap();
2518 assert_matches!(current_engine, CurrentEngine::Engine(_));
2519 }),
2520 },
2521 ];
2522
2523 for test in tests {
2524 let CurrentEngineTest {
2525 region_id,
2526 current_region_status,
2527 region_change,
2528 assert,
2529 } = test;
2530
2531 if let Some(status) = current_region_status {
2533 mock_region_server
2534 .inner
2535 .region_map
2536 .insert(region_id, status);
2537 } else {
2538 mock_region_server.inner.region_map.remove(®ion_id);
2539 }
2540
2541 let result = mock_region_server
2542 .inner
2543 .get_engine(region_id, ®ion_change);
2544
2545 assert(result);
2546 }
2547 }
2548
2549 #[tokio::test]
2550 async fn test_region_server_parallelism() {
2551 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2552 let first_query = p.acquire().await;
2553 assert!(first_query.is_ok());
2554 let second_query = p.acquire().await;
2555 assert!(second_query.is_ok());
2556 let third_query = p.acquire().await;
2557 assert!(third_query.is_err());
2558 let err = third_query.unwrap_err();
2559 assert_eq!(
2560 err.output_msg(),
2561 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2562 );
2563 drop(first_query);
2564 let forth_query = p.acquire().await;
2565 assert!(forth_query.is_ok());
2566 }
2567
2568 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2569 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2570 metadata_builder.push_column_metadata(ColumnMetadata {
2571 column_schema: datatypes::schema::ColumnSchema::new(
2572 "timestamp",
2573 ConcreteDataType::timestamp_nanosecond_datatype(),
2574 false,
2575 ),
2576 semantic_type: SemanticType::Timestamp,
2577 column_id: 0,
2578 });
2579 metadata_builder.push_column_metadata(ColumnMetadata {
2580 column_schema: datatypes::schema::ColumnSchema::new(
2581 "file",
2582 ConcreteDataType::string_datatype(),
2583 true,
2584 ),
2585 semantic_type: SemanticType::Tag,
2586 column_id: 1,
2587 });
2588 metadata_builder.push_column_metadata(ColumnMetadata {
2589 column_schema: datatypes::schema::ColumnSchema::new(
2590 "message",
2591 ConcreteDataType::string_datatype(),
2592 true,
2593 ),
2594 semantic_type: SemanticType::Field,
2595 column_id: 2,
2596 });
2597 metadata_builder.primary_key(vec![1]);
2598 metadata_builder.build().unwrap()
2599 }
2600
2601 #[tokio::test]
2602 async fn test_handle_list_metadata_request() {
2603 common_telemetry::init_default_ut_logging();
2604
2605 let mut mock_region_server = mock_region_server();
2606 let region_id_1 = RegionId::new(1, 0);
2607 let region_id_2 = RegionId::new(2, 0);
2608
2609 let metadata_1 = mock_region_metadata(region_id_1);
2610 let metadata_2 = mock_region_metadata(region_id_2);
2611 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2612
2613 let metadata_1 = Arc::new(metadata_1);
2614 let metadata_2 = Arc::new(metadata_2);
2615 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2616 MITO_ENGINE_NAME,
2617 Box::new(move |region_id| {
2618 if region_id == region_id_1 {
2619 Ok(metadata_1.clone())
2620 } else if region_id == region_id_2 {
2621 Ok(metadata_2.clone())
2622 } else {
2623 error::RegionNotFoundSnafu { region_id }.fail()
2624 }
2625 }),
2626 );
2627
2628 mock_region_server.register_engine(engine.clone());
2629 mock_region_server
2630 .inner
2631 .region_map
2632 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2633 mock_region_server
2634 .inner
2635 .region_map
2636 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2637
2638 let list_metadata_request = ListMetadataRequest {
2640 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2641 };
2642 let response = mock_region_server
2643 .handle_list_metadata_request(&list_metadata_request)
2644 .await
2645 .unwrap();
2646 let decoded_metadata: Vec<Option<RegionMetadata>> =
2647 serde_json::from_slice(&response.metadata).unwrap();
2648 assert_eq!(metadatas, decoded_metadata);
2649 }
2650
2651 #[tokio::test]
2652 async fn test_handle_list_metadata_not_found() {
2653 common_telemetry::init_default_ut_logging();
2654
2655 let mut mock_region_server = mock_region_server();
2656 let region_id_1 = RegionId::new(1, 0);
2657 let region_id_2 = RegionId::new(2, 0);
2658
2659 let metadata_1 = mock_region_metadata(region_id_1);
2660 let metadatas = vec![Some(metadata_1.clone()), None];
2661
2662 let metadata_1 = Arc::new(metadata_1);
2663 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2664 MITO_ENGINE_NAME,
2665 Box::new(move |region_id| {
2666 if region_id == region_id_1 {
2667 Ok(metadata_1.clone())
2668 } else {
2669 error::RegionNotFoundSnafu { region_id }.fail()
2670 }
2671 }),
2672 );
2673
2674 mock_region_server.register_engine(engine.clone());
2675 mock_region_server
2676 .inner
2677 .region_map
2678 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2679
2680 let list_metadata_request = ListMetadataRequest {
2682 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2683 };
2684 let response = mock_region_server
2685 .handle_list_metadata_request(&list_metadata_request)
2686 .await
2687 .unwrap();
2688 let decoded_metadata: Vec<Option<RegionMetadata>> =
2689 serde_json::from_slice(&response.metadata).unwrap();
2690 assert_eq!(metadatas, decoded_metadata);
2691
2692 mock_region_server
2694 .inner
2695 .region_map
2696 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2697 let response = mock_region_server
2698 .handle_list_metadata_request(&list_metadata_request)
2699 .await
2700 .unwrap();
2701 let decoded_metadata: Vec<Option<RegionMetadata>> =
2702 serde_json::from_slice(&response.metadata).unwrap();
2703 assert_eq!(metadatas, decoded_metadata);
2704 }
2705
2706 #[tokio::test]
2707 async fn test_handle_list_metadata_failed() {
2708 common_telemetry::init_default_ut_logging();
2709
2710 let mut mock_region_server = mock_region_server();
2711 let region_id_1 = RegionId::new(1, 0);
2712
2713 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2714 MITO_ENGINE_NAME,
2715 Box::new(move |region_id| {
2716 error::UnexpectedSnafu {
2717 violated: format!("Failed to get region {region_id}"),
2718 }
2719 .fail()
2720 }),
2721 );
2722
2723 mock_region_server.register_engine(engine.clone());
2724 mock_region_server
2725 .inner
2726 .region_map
2727 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2728
2729 let list_metadata_request = ListMetadataRequest {
2731 region_ids: vec![region_id_1.as_u64()],
2732 };
2733 mock_region_server
2734 .handle_list_metadata_request(&list_metadata_request)
2735 .await
2736 .unwrap_err();
2737 }
2738}