1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use api::region::RegionResponse;
22use api::v1::region::sync_request::ManifestInfo;
23use api::v1::region::{
24 region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
25};
26use api::v1::{ResponseHeader, Status};
27use arrow_flight::{FlightData, Ticket};
28use async_trait::async_trait;
29use bytes::Bytes;
30use common_error::ext::{BoxedError, ErrorExt};
31use common_error::status_code::StatusCode;
32use common_query::request::QueryRequest;
33use common_query::OutputData;
34use common_recordbatch::SendableRecordBatchStream;
35use common_runtime::Runtime;
36use common_telemetry::tracing::{self, info_span};
37use common_telemetry::tracing_context::{FutureExt, TracingContext};
38use common_telemetry::{debug, error, info, warn};
39use dashmap::DashMap;
40use datafusion::datasource::{provider_as_source, TableProvider};
41use datafusion::error::Result as DfResult;
42use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
43use datafusion_expr::{LogicalPlan, TableSource};
44use futures_util::future::try_join_all;
45use metric_engine::engine::MetricEngine;
46use mito2::engine::MITO_ENGINE_NAME;
47use prost::Message;
48pub use query::dummy_catalog::{
49 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
50};
51use query::QueryEngineRef;
52use serde_json;
53use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
54use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
55use servers::grpc::region_server::RegionServerHandler;
56use servers::grpc::FlightCompression;
57use session::context::{QueryContextBuilder, QueryContextRef};
58use snafu::{ensure, OptionExt, ResultExt};
59use store_api::metric_engine_consts::{
60 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
61};
62use store_api::region_engine::{
63 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
64 SettableRegionRoleState,
65};
66use store_api::region_request::{
67 AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
68};
69use store_api::storage::RegionId;
70use tokio::sync::{Semaphore, SemaphorePermit};
71use tokio::time::timeout;
72use tonic::{Request, Response, Result as TonicResult};
73
74use crate::error::{
75 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
76 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
77 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
78 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
79 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
80 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
81};
82use crate::event_listener::RegionServerEventListenerRef;
83
84#[derive(Clone)]
85pub struct RegionServer {
86 inner: Arc<RegionServerInner>,
87 flight_compression: FlightCompression,
88}
89
90pub struct RegionStat {
91 pub region_id: RegionId,
92 pub engine: String,
93 pub role: RegionRole,
94}
95
96impl RegionServer {
97 pub fn new(
98 query_engine: QueryEngineRef,
99 runtime: Runtime,
100 event_listener: RegionServerEventListenerRef,
101 flight_compression: FlightCompression,
102 ) -> Self {
103 Self::with_table_provider(
104 query_engine,
105 runtime,
106 event_listener,
107 Arc::new(DummyTableProviderFactory),
108 0,
109 Duration::from_millis(0),
110 flight_compression,
111 )
112 }
113
114 pub fn with_table_provider(
115 query_engine: QueryEngineRef,
116 runtime: Runtime,
117 event_listener: RegionServerEventListenerRef,
118 table_provider_factory: TableProviderFactoryRef,
119 max_concurrent_queries: usize,
120 concurrent_query_limiter_timeout: Duration,
121 flight_compression: FlightCompression,
122 ) -> Self {
123 Self {
124 inner: Arc::new(RegionServerInner::new(
125 query_engine,
126 runtime,
127 event_listener,
128 table_provider_factory,
129 RegionServerParallelism::from_opts(
130 max_concurrent_queries,
131 concurrent_query_limiter_timeout,
132 ),
133 )),
134 flight_compression,
135 }
136 }
137
138 pub fn register_engine(&mut self, engine: RegionEngineRef) {
139 self.inner.register_engine(engine);
140 }
141
142 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
144 match self.inner.get_engine(region_id, &RegionChange::None) {
145 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
146 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
147 Err(error::Error::RegionNotFound { .. }) => Ok(None),
148 Err(err) => Err(err),
149 }
150 }
151
152 #[tracing::instrument(skip_all)]
153 pub async fn handle_batch_open_requests(
154 &self,
155 parallelism: usize,
156 requests: Vec<(RegionId, RegionOpenRequest)>,
157 ) -> Result<Vec<RegionId>> {
158 self.inner
159 .handle_batch_open_requests(parallelism, requests)
160 .await
161 }
162
163 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
164 pub async fn handle_request(
165 &self,
166 region_id: RegionId,
167 request: RegionRequest,
168 ) -> Result<RegionResponse> {
169 self.inner.handle_request(region_id, request).await
170 }
171
172 async fn table_provider(
174 &self,
175 region_id: RegionId,
176 ctx: Option<&session::context::QueryContext>,
177 ) -> Result<Arc<dyn TableProvider>> {
178 let status = self
179 .inner
180 .region_map
181 .get(®ion_id)
182 .context(RegionNotFoundSnafu { region_id })?
183 .clone();
184 ensure!(
185 matches!(status, RegionEngineWithStatus::Ready(_)),
186 RegionNotReadySnafu { region_id }
187 );
188
189 self.inner
190 .table_provider_factory
191 .create(region_id, status.into_engine(), ctx)
192 .await
193 .context(ExecuteLogicalPlanSnafu)
194 }
195
196 pub async fn handle_remote_read(
198 &self,
199 request: api::v1::region::QueryRequest,
200 ) -> Result<SendableRecordBatchStream> {
201 let _permit = if let Some(p) = &self.inner.parallelism {
202 Some(p.acquire().await?)
203 } else {
204 None
205 };
206
207 let query_ctx: QueryContextRef = request
208 .header
209 .as_ref()
210 .map(|h| Arc::new(h.into()))
211 .unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
212
213 let region_id = RegionId::from_u64(request.region_id);
214 let provider = self.table_provider(region_id, Some(&query_ctx)).await?;
215 let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
216
217 let decoder = self
218 .inner
219 .query_engine
220 .engine_context(query_ctx)
221 .new_plan_decoder()
222 .context(NewPlanDecoderSnafu)?;
223
224 let plan = decoder
225 .decode(Bytes::from(request.plan), catalog_list, false)
226 .await
227 .context(DecodeLogicalPlanSnafu)?;
228
229 self.inner
230 .handle_read(QueryRequest {
231 header: request.header,
232 region_id,
233 plan,
234 })
235 .await
236 }
237
238 #[tracing::instrument(skip_all)]
239 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
240 let _permit = if let Some(p) = &self.inner.parallelism {
241 Some(p.acquire().await?)
242 } else {
243 None
244 };
245
246 let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
247
248 let provider = self.table_provider(request.region_id, ctx.as_ref()).await?;
249
250 struct RegionDataSourceInjector {
251 source: Arc<dyn TableSource>,
252 }
253
254 impl TreeNodeRewriter for RegionDataSourceInjector {
255 type Node = LogicalPlan;
256
257 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
258 Ok(match node {
259 LogicalPlan::TableScan(mut scan) => {
260 scan.source = self.source.clone();
261 Transformed::yes(LogicalPlan::TableScan(scan))
262 }
263 _ => Transformed::no(node),
264 })
265 }
266 }
267
268 let plan = request
269 .plan
270 .rewrite(&mut RegionDataSourceInjector {
271 source: provider_as_source(provider),
272 })
273 .context(DataFusionSnafu)?
274 .data;
275
276 self.inner
277 .handle_read(QueryRequest { plan, ..request })
278 .await
279 }
280
281 pub fn reportable_regions(&self) -> Vec<RegionStat> {
285 self.inner
286 .region_map
287 .iter()
288 .filter_map(|e| {
289 let region_id = *e.key();
290 e.role(region_id).map(|role| RegionStat {
292 region_id,
293 engine: e.value().name().to_string(),
294 role,
295 })
296 })
297 .collect()
298 }
299
300 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
301 self.inner.region_map.get(®ion_id).and_then(|engine| {
302 engine.role(region_id).map(|role| match role {
303 RegionRole::Follower => false,
304 RegionRole::Leader => true,
305 RegionRole::DowngradingLeader => true,
306 })
307 })
308 }
309
310 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
311 let engine = self
312 .inner
313 .region_map
314 .get(®ion_id)
315 .with_context(|| RegionNotFoundSnafu { region_id })?;
316 engine
317 .set_region_role(region_id, role)
318 .with_context(|_| HandleRegionRequestSnafu { region_id })
319 }
320
321 pub async fn set_region_role_state_gracefully(
331 &self,
332 region_id: RegionId,
333 state: SettableRegionRoleState,
334 ) -> Result<SetRegionRoleStateResponse> {
335 match self.inner.region_map.get(®ion_id) {
336 Some(engine) => Ok(engine
337 .set_region_role_state_gracefully(region_id, state)
338 .await
339 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
340 None => Ok(SetRegionRoleStateResponse::NotFound),
341 }
342 }
343
344 pub fn runtime(&self) -> Runtime {
345 self.inner.runtime.clone()
346 }
347
348 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
349 match self.inner.region_map.get(®ion_id) {
350 Some(e) => e.region_statistic(region_id),
351 None => None,
352 }
353 }
354
355 pub async fn stop(&self) -> Result<()> {
357 self.inner.stop().await
358 }
359
360 #[cfg(test)]
361 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
363 self.inner
364 .region_map
365 .insert(region_id, RegionEngineWithStatus::Ready(engine));
366 }
367
368 async fn handle_batch_ddl_requests(
369 &self,
370 request: region_request::Body,
371 ) -> Result<RegionResponse> {
372 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
374 .context(BuildRegionRequestsSnafu)?
375 .unwrap();
376 let tracing_context = TracingContext::from_current_span();
377
378 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
379 self.inner
380 .handle_batch_request(batch_request)
381 .trace(span)
382 .await
383 }
384
385 async fn handle_requests_in_parallel(
386 &self,
387 request: region_request::Body,
388 ) -> Result<RegionResponse> {
389 let requests =
390 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
391 let tracing_context = TracingContext::from_current_span();
392
393 let join_tasks = requests.into_iter().map(|(region_id, req)| {
394 let self_to_move = self;
395 let span = tracing_context.attach(info_span!(
396 "RegionServer::handle_region_request",
397 region_id = region_id.to_string()
398 ));
399 async move {
400 self_to_move
401 .handle_request(region_id, req)
402 .trace(span)
403 .await
404 }
405 });
406
407 let results = try_join_all(join_tasks).await?;
408 let mut affected_rows = 0;
409 let mut extensions = HashMap::new();
410 for result in results {
411 affected_rows += result.affected_rows;
412 extensions.extend(result.extensions);
413 }
414
415 Ok(RegionResponse {
416 affected_rows,
417 extensions,
418 metadata: Vec::new(),
419 })
420 }
421
422 async fn handle_requests_in_serial(
423 &self,
424 request: region_request::Body,
425 ) -> Result<RegionResponse> {
426 let requests =
427 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
428 let tracing_context = TracingContext::from_current_span();
429
430 let mut affected_rows = 0;
431 let mut extensions = HashMap::new();
432 for (region_id, req) in requests {
435 let span = tracing_context.attach(info_span!(
436 "RegionServer::handle_region_request",
437 region_id = region_id.to_string()
438 ));
439 let result = self.handle_request(region_id, req).trace(span).await?;
440
441 affected_rows += result.affected_rows;
442 extensions.extend(result.extensions);
443 }
444
445 Ok(RegionResponse {
446 affected_rows,
447 extensions,
448 metadata: Vec::new(),
449 })
450 }
451
452 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
453 let region_id = RegionId::from_u64(request.region_id);
454 let manifest_info = request
455 .manifest_info
456 .context(error::MissingRequiredFieldSnafu {
457 name: "manifest_info",
458 })?;
459
460 let manifest_info = match manifest_info {
461 ManifestInfo::MitoManifestInfo(info) => {
462 RegionManifestInfo::mito(info.data_manifest_version, 0)
463 }
464 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
465 info.data_manifest_version,
466 0,
467 info.metadata_manifest_version,
468 0,
469 ),
470 };
471
472 let tracing_context = TracingContext::from_current_span();
473 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
474
475 self.sync_region(region_id, manifest_info)
476 .trace(span)
477 .await
478 .map(|_| RegionResponse::new(AffectedRows::default()))
479 }
480
481 #[tracing::instrument(skip_all)]
486 async fn handle_list_metadata_request(
487 &self,
488 request: &ListMetadataRequest,
489 ) -> Result<RegionResponse> {
490 let mut region_metadatas = Vec::new();
491 for region_id in &request.region_ids {
493 let region_id = RegionId::from_u64(*region_id);
494 let Some(engine) = self.find_engine(region_id)? else {
496 region_metadatas.push(None);
497 continue;
498 };
499
500 match engine.get_metadata(region_id).await {
501 Ok(metadata) => region_metadatas.push(Some(metadata)),
502 Err(err) => {
503 if err.status_code() == StatusCode::RegionNotFound {
504 region_metadatas.push(None);
505 } else {
506 Err(err).with_context(|_| GetRegionMetadataSnafu {
507 engine: engine.name(),
508 region_id,
509 })?;
510 }
511 }
512 }
513 }
514
515 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
517
518 let response = RegionResponse::from_metadata(json_result);
519
520 Ok(response)
521 }
522
523 pub async fn sync_region(
525 &self,
526 region_id: RegionId,
527 manifest_info: RegionManifestInfo,
528 ) -> Result<()> {
529 let engine_with_status = self
530 .inner
531 .region_map
532 .get(®ion_id)
533 .with_context(|| RegionNotFoundSnafu { region_id })?;
534
535 self.inner
536 .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
537 .await
538 }
539}
540
541#[async_trait]
542impl RegionServerHandler for RegionServer {
543 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
544 let response = match &request {
545 region_request::Body::Creates(_)
546 | region_request::Body::Drops(_)
547 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
548 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
549 self.handle_requests_in_parallel(request).await
550 }
551 region_request::Body::Sync(sync_request) => {
552 self.handle_sync_region_request(sync_request).await
553 }
554 region_request::Body::ListMetadata(list_metadata_request) => {
555 self.handle_list_metadata_request(list_metadata_request)
556 .await
557 }
558 _ => self.handle_requests_in_serial(request).await,
559 }
560 .map_err(BoxedError::new)
561 .context(ExecuteGrpcRequestSnafu)?;
562
563 Ok(RegionResponseV1 {
564 header: Some(ResponseHeader {
565 status: Some(Status {
566 status_code: StatusCode::Success as _,
567 ..Default::default()
568 }),
569 }),
570 affected_rows: response.affected_rows as _,
571 extensions: response.extensions,
572 metadata: response.metadata,
573 })
574 }
575}
576
577#[async_trait]
578impl FlightCraft for RegionServer {
579 async fn do_get(
580 &self,
581 request: Request<Ticket>,
582 ) -> TonicResult<Response<TonicStream<FlightData>>> {
583 let ticket = request.into_inner().ticket;
584 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
585 .context(servers_error::InvalidFlightTicketSnafu)?;
586 let tracing_context = request
587 .header
588 .as_ref()
589 .map(|h| TracingContext::from_w3c(&h.tracing_context))
590 .unwrap_or_default();
591
592 let result = self
593 .handle_remote_read(request)
594 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
595 .await?;
596
597 let stream = Box::pin(FlightRecordBatchStream::new(
598 result,
599 tracing_context,
600 self.flight_compression,
601 ));
602 Ok(Response::new(stream))
603 }
604}
605
606#[derive(Clone)]
607enum RegionEngineWithStatus {
608 Registering(RegionEngineRef),
610 Deregistering(RegionEngineRef),
612 Ready(RegionEngineRef),
614}
615
616impl RegionEngineWithStatus {
617 pub fn into_engine(self) -> RegionEngineRef {
619 match self {
620 RegionEngineWithStatus::Registering(engine) => engine,
621 RegionEngineWithStatus::Deregistering(engine) => engine,
622 RegionEngineWithStatus::Ready(engine) => engine,
623 }
624 }
625
626 pub fn engine(&self) -> &RegionEngineRef {
628 match self {
629 RegionEngineWithStatus::Registering(engine) => engine,
630 RegionEngineWithStatus::Deregistering(engine) => engine,
631 RegionEngineWithStatus::Ready(engine) => engine,
632 }
633 }
634}
635
636impl Deref for RegionEngineWithStatus {
637 type Target = RegionEngineRef;
638
639 fn deref(&self) -> &Self::Target {
640 match self {
641 RegionEngineWithStatus::Registering(engine) => engine,
642 RegionEngineWithStatus::Deregistering(engine) => engine,
643 RegionEngineWithStatus::Ready(engine) => engine,
644 }
645 }
646}
647
648struct RegionServerInner {
649 engines: RwLock<HashMap<String, RegionEngineRef>>,
650 region_map: DashMap<RegionId, RegionEngineWithStatus>,
651 query_engine: QueryEngineRef,
652 runtime: Runtime,
653 event_listener: RegionServerEventListenerRef,
654 table_provider_factory: TableProviderFactoryRef,
655 parallelism: Option<RegionServerParallelism>,
658}
659
660struct RegionServerParallelism {
661 semaphore: Semaphore,
662 timeout: Duration,
663}
664
665impl RegionServerParallelism {
666 pub fn from_opts(
667 max_concurrent_queries: usize,
668 concurrent_query_limiter_timeout: Duration,
669 ) -> Option<Self> {
670 if max_concurrent_queries == 0 {
671 return None;
672 }
673 Some(RegionServerParallelism {
674 semaphore: Semaphore::new(max_concurrent_queries),
675 timeout: concurrent_query_limiter_timeout,
676 })
677 }
678
679 pub async fn acquire(&self) -> Result<SemaphorePermit> {
680 timeout(self.timeout, self.semaphore.acquire())
681 .await
682 .context(ConcurrentQueryLimiterTimeoutSnafu)?
683 .context(ConcurrentQueryLimiterClosedSnafu)
684 }
685}
686
687enum CurrentEngine {
688 Engine(RegionEngineRef),
689 EarlyReturn(AffectedRows),
690}
691
692impl Debug for CurrentEngine {
693 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
694 match self {
695 CurrentEngine::Engine(engine) => f
696 .debug_struct("CurrentEngine")
697 .field("engine", &engine.name())
698 .finish(),
699 CurrentEngine::EarlyReturn(rows) => f
700 .debug_struct("CurrentEngine")
701 .field("return", rows)
702 .finish(),
703 }
704 }
705}
706
707impl RegionServerInner {
708 pub fn new(
709 query_engine: QueryEngineRef,
710 runtime: Runtime,
711 event_listener: RegionServerEventListenerRef,
712 table_provider_factory: TableProviderFactoryRef,
713 parallelism: Option<RegionServerParallelism>,
714 ) -> Self {
715 Self {
716 engines: RwLock::new(HashMap::new()),
717 region_map: DashMap::new(),
718 query_engine,
719 runtime,
720 event_listener,
721 table_provider_factory,
722 parallelism,
723 }
724 }
725
726 pub fn register_engine(&self, engine: RegionEngineRef) {
727 let engine_name = engine.name();
728 info!("Region Engine {engine_name} is registered");
729 self.engines
730 .write()
731 .unwrap()
732 .insert(engine_name.to_string(), engine);
733 }
734
735 fn get_engine(
736 &self,
737 region_id: RegionId,
738 region_change: &RegionChange,
739 ) -> Result<CurrentEngine> {
740 let current_region_status = self.region_map.get(®ion_id);
741
742 let engine = match region_change {
743 RegionChange::Register(attribute) => match current_region_status {
744 Some(status) => match status.clone() {
745 RegionEngineWithStatus::Registering(engine) => engine,
746 RegionEngineWithStatus::Deregistering(_) => {
747 return error::RegionBusySnafu { region_id }.fail()
748 }
749 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
750 },
751 _ => self
752 .engines
753 .read()
754 .unwrap()
755 .get(attribute.engine())
756 .with_context(|| RegionEngineNotFoundSnafu {
757 name: attribute.engine(),
758 })?
759 .clone(),
760 },
761 RegionChange::Deregisters => match current_region_status {
762 Some(status) => match status.clone() {
763 RegionEngineWithStatus::Registering(_) => {
764 return error::RegionBusySnafu { region_id }.fail()
765 }
766 RegionEngineWithStatus::Deregistering(_) => {
767 return Ok(CurrentEngine::EarlyReturn(0))
768 }
769 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
770 },
771 None => return Ok(CurrentEngine::EarlyReturn(0)),
772 },
773 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
774 match current_region_status {
775 Some(status) => match status.clone() {
776 RegionEngineWithStatus::Registering(_) => {
777 return error::RegionNotReadySnafu { region_id }.fail()
778 }
779 RegionEngineWithStatus::Deregistering(_) => {
780 return error::RegionNotFoundSnafu { region_id }.fail()
781 }
782 RegionEngineWithStatus::Ready(engine) => engine,
783 },
784 None => return error::RegionNotFoundSnafu { region_id }.fail(),
785 }
786 }
787 };
788
789 Ok(CurrentEngine::Engine(engine))
790 }
791
792 async fn handle_batch_open_requests_inner(
793 &self,
794 engine: RegionEngineRef,
795 parallelism: usize,
796 requests: Vec<(RegionId, RegionOpenRequest)>,
797 ) -> Result<Vec<RegionId>> {
798 let region_changes = requests
799 .iter()
800 .map(|(region_id, open)| {
801 let attribute = parse_region_attribute(&open.engine, &open.options)?;
802 Ok((*region_id, RegionChange::Register(attribute)))
803 })
804 .collect::<Result<HashMap<_, _>>>()?;
805
806 for (®ion_id, region_change) in ®ion_changes {
807 self.set_region_status_not_ready(region_id, &engine, region_change)
808 }
809
810 let mut open_regions = Vec::with_capacity(requests.len());
811 let mut errors = vec![];
812 match engine
813 .handle_batch_open_requests(parallelism, requests)
814 .await
815 .with_context(|_| HandleBatchOpenRequestSnafu)
816 {
817 Ok(results) => {
818 for (region_id, result) in results {
819 let region_change = ®ion_changes[®ion_id];
820 match result {
821 Ok(_) => {
822 if let Err(e) = self
823 .set_region_status_ready(region_id, engine.clone(), *region_change)
824 .await
825 {
826 error!(e; "Failed to set region to ready: {}", region_id);
827 errors.push(BoxedError::new(e));
828 } else {
829 open_regions.push(region_id)
830 }
831 }
832 Err(e) => {
833 self.unset_region_status(region_id, &engine, *region_change);
834 error!(e; "Failed to open region: {}", region_id);
835 errors.push(e);
836 }
837 }
838 }
839 }
840 Err(e) => {
841 for (®ion_id, region_change) in ®ion_changes {
842 self.unset_region_status(region_id, &engine, *region_change);
843 }
844 error!(e; "Failed to open batch regions");
845 errors.push(BoxedError::new(e));
846 }
847 }
848
849 if !errors.is_empty() {
850 return error::UnexpectedSnafu {
851 violated: format!("Failed to open batch regions: {:?}", errors[0]),
853 }
854 .fail();
855 }
856
857 Ok(open_regions)
858 }
859
860 pub async fn handle_batch_open_requests(
861 &self,
862 parallelism: usize,
863 requests: Vec<(RegionId, RegionOpenRequest)>,
864 ) -> Result<Vec<RegionId>> {
865 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
866 HashMap::with_capacity(requests.len());
867 for (region_id, request) in requests {
868 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
869 requests.push((region_id, request));
870 } else {
871 engine_grouped_requests
872 .insert(request.engine.to_string(), vec![(region_id, request)]);
873 }
874 }
875
876 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
877 for (engine, requests) in engine_grouped_requests {
878 let engine = self
879 .engines
880 .read()
881 .unwrap()
882 .get(&engine)
883 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
884 .clone();
885 results.push(
886 self.handle_batch_open_requests_inner(engine, parallelism, requests)
887 .await,
888 )
889 }
890
891 Ok(results
892 .into_iter()
893 .collect::<Result<Vec<_>>>()?
894 .into_iter()
895 .flatten()
896 .collect::<Vec<_>>())
897 }
898
899 pub async fn handle_batch_request(
903 &self,
904 batch_request: BatchRegionDdlRequest,
905 ) -> Result<RegionResponse> {
906 let region_changes = match &batch_request {
907 BatchRegionDdlRequest::Create(requests) => requests
908 .iter()
909 .map(|(region_id, create)| {
910 let attribute = parse_region_attribute(&create.engine, &create.options)?;
911 Ok((*region_id, RegionChange::Register(attribute)))
912 })
913 .collect::<Result<Vec<_>>>()?,
914 BatchRegionDdlRequest::Drop(requests) => requests
915 .iter()
916 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
917 .collect::<Vec<_>>(),
918 BatchRegionDdlRequest::Alter(requests) => requests
919 .iter()
920 .map(|(region_id, _)| (*region_id, RegionChange::None))
921 .collect::<Vec<_>>(),
922 };
923
924 let (first_region_id, first_region_change) = region_changes.first().unwrap();
927 let engine = match self.get_engine(*first_region_id, first_region_change)? {
928 CurrentEngine::Engine(engine) => engine,
929 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
930 };
931
932 for (region_id, region_change) in region_changes.iter() {
933 self.set_region_status_not_ready(*region_id, &engine, region_change);
934 }
935
936 let ddl_type = batch_request.request_type();
937 let result = engine
938 .handle_batch_ddl_requests(batch_request)
939 .await
940 .context(HandleBatchDdlRequestSnafu { ddl_type });
941
942 match result {
943 Ok(result) => {
944 for (region_id, region_change) in ®ion_changes {
945 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
946 .await?;
947 }
948
949 Ok(RegionResponse {
950 affected_rows: result.affected_rows,
951 extensions: result.extensions,
952 metadata: Vec::new(),
953 })
954 }
955 Err(err) => {
956 for (region_id, region_change) in region_changes {
957 self.unset_region_status(region_id, &engine, region_change);
958 }
959
960 Err(err)
961 }
962 }
963 }
964
965 pub async fn handle_request(
966 &self,
967 region_id: RegionId,
968 request: RegionRequest,
969 ) -> Result<RegionResponse> {
970 let request_type = request.request_type();
971 let region_id_str = region_id.to_string();
972 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
973 .with_label_values(&[®ion_id_str, request_type])
974 .start_timer();
975
976 let region_change = match &request {
977 RegionRequest::Create(create) => {
978 let attribute = parse_region_attribute(&create.engine, &create.options)?;
979 RegionChange::Register(attribute)
980 }
981 RegionRequest::Open(open) => {
982 let attribute = parse_region_attribute(&open.engine, &open.options)?;
983 RegionChange::Register(attribute)
984 }
985 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
986 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
987 RegionChange::Ingest
988 }
989 RegionRequest::Alter(_)
990 | RegionRequest::Flush(_)
991 | RegionRequest::Compact(_)
992 | RegionRequest::Truncate(_) => RegionChange::None,
993 RegionRequest::Catchup(_) => RegionChange::Catchup,
994 };
995
996 let engine = match self.get_engine(region_id, ®ion_change)? {
997 CurrentEngine::Engine(engine) => engine,
998 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
999 };
1000
1001 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1003
1004 match engine
1005 .handle_request(region_id, request)
1006 .await
1007 .with_context(|_| HandleRegionRequestSnafu { region_id })
1008 {
1009 Ok(result) => {
1010 if matches!(region_change, RegionChange::Ingest) {
1012 crate::metrics::REGION_CHANGED_ROW_COUNT
1013 .with_label_values(&[®ion_id_str, request_type])
1014 .inc_by(result.affected_rows as u64);
1015 }
1016 self.set_region_status_ready(region_id, engine.clone(), region_change)
1018 .await?;
1019
1020 Ok(RegionResponse {
1021 affected_rows: result.affected_rows,
1022 extensions: result.extensions,
1023 metadata: Vec::new(),
1024 })
1025 }
1026 Err(err) => {
1027 self.unset_region_status(region_id, &engine, region_change);
1029 Err(err)
1030 }
1031 }
1032 }
1033
1034 pub async fn handle_sync_region(
1036 &self,
1037 engine: &RegionEngineRef,
1038 region_id: RegionId,
1039 manifest_info: RegionManifestInfo,
1040 ) -> Result<()> {
1041 let Some(new_opened_regions) = engine
1042 .sync_region(region_id, manifest_info)
1043 .await
1044 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1045 .new_opened_logical_region_ids()
1046 else {
1047 warn!("No new opened logical regions");
1048 return Ok(());
1049 };
1050
1051 for region in new_opened_regions {
1052 self.region_map
1053 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1054 info!("Logical region {} is registered!", region);
1055 }
1056
1057 Ok(())
1058 }
1059
1060 fn set_region_status_not_ready(
1061 &self,
1062 region_id: RegionId,
1063 engine: &RegionEngineRef,
1064 region_change: &RegionChange,
1065 ) {
1066 match region_change {
1067 RegionChange::Register(_) => {
1068 self.region_map.insert(
1069 region_id,
1070 RegionEngineWithStatus::Registering(engine.clone()),
1071 );
1072 }
1073 RegionChange::Deregisters => {
1074 self.region_map.insert(
1075 region_id,
1076 RegionEngineWithStatus::Deregistering(engine.clone()),
1077 );
1078 }
1079 _ => {}
1080 }
1081 }
1082
1083 fn unset_region_status(
1084 &self,
1085 region_id: RegionId,
1086 engine: &RegionEngineRef,
1087 region_change: RegionChange,
1088 ) {
1089 match region_change {
1090 RegionChange::None | RegionChange::Ingest => {}
1091 RegionChange::Register(_) => {
1092 self.region_map.remove(®ion_id);
1093 }
1094 RegionChange::Deregisters => {
1095 self.region_map
1096 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1097 }
1098 RegionChange::Catchup => {}
1099 }
1100 }
1101
1102 async fn set_region_status_ready(
1103 &self,
1104 region_id: RegionId,
1105 engine: RegionEngineRef,
1106 region_change: RegionChange,
1107 ) -> Result<()> {
1108 let engine_type = engine.name();
1109 match region_change {
1110 RegionChange::None | RegionChange::Ingest => {}
1111 RegionChange::Register(attribute) => {
1112 info!(
1113 "Region {region_id} is registered to engine {}",
1114 attribute.engine()
1115 );
1116 self.region_map
1117 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1118
1119 match attribute {
1120 RegionAttribute::Metric { physical } => {
1121 if physical {
1122 self.register_logical_regions(&engine, region_id).await?;
1124 self.event_listener.on_region_registered(region_id);
1126 }
1127 }
1128 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1129 RegionAttribute::File => {
1130 }
1132 }
1133 }
1134 RegionChange::Deregisters => {
1135 info!("Region {region_id} is deregistered from engine {engine_type}");
1136 self.region_map
1137 .remove(®ion_id)
1138 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1139 self.event_listener.on_region_deregistered(region_id);
1140 }
1141 RegionChange::Catchup => {
1142 if is_metric_engine(engine.name()) {
1143 self.register_logical_regions(&engine, region_id).await?;
1145 }
1146 }
1147 }
1148 Ok(())
1149 }
1150
1151 async fn register_logical_regions(
1152 &self,
1153 engine: &RegionEngineRef,
1154 physical_region_id: RegionId,
1155 ) -> Result<()> {
1156 let metric_engine =
1157 engine
1158 .as_any()
1159 .downcast_ref::<MetricEngine>()
1160 .context(UnexpectedSnafu {
1161 violated: format!(
1162 "expecting engine type '{}', actual '{}'",
1163 METRIC_ENGINE_NAME,
1164 engine.name(),
1165 ),
1166 })?;
1167
1168 let logical_regions = metric_engine
1169 .logical_regions(physical_region_id)
1170 .await
1171 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1172
1173 for region in logical_regions {
1174 self.region_map
1175 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1176 info!("Logical region {} is registered!", region);
1177 }
1178 Ok(())
1179 }
1180
1181 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
1182 let query_ctx: QueryContextRef = request
1186 .header
1187 .as_ref()
1188 .map(|h| Arc::new(h.into()))
1189 .unwrap_or_else(|| QueryContextBuilder::default().build().into());
1190
1191 let result = self
1192 .query_engine
1193 .execute(request.plan, query_ctx)
1194 .await
1195 .context(ExecuteLogicalPlanSnafu)?;
1196
1197 match result.data {
1198 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1199 UnsupportedOutputSnafu { expected: "stream" }.fail()
1200 }
1201 OutputData::Stream(stream) => Ok(stream),
1202 }
1203 }
1204
1205 async fn stop(&self) -> Result<()> {
1206 let regions = self
1214 .region_map
1215 .iter()
1216 .map(|x| (*x.key(), x.value().clone()))
1217 .collect::<Vec<_>>();
1218 let num_regions = regions.len();
1219
1220 for (region_id, engine) in regions {
1221 let closed = engine
1222 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1223 .await;
1224 match closed {
1225 Ok(_) => debug!("Region {region_id} is closed"),
1226 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1227 }
1228 }
1229 self.region_map.clear();
1230 info!("closed {num_regions} regions");
1231
1232 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1233 for (engine_name, engine) in engines {
1234 engine
1235 .stop()
1236 .await
1237 .context(StopRegionEngineSnafu { name: &engine_name })?;
1238 info!("Region engine {engine_name} is stopped");
1239 }
1240
1241 Ok(())
1242 }
1243}
1244
1245#[derive(Debug, Clone, Copy)]
1246enum RegionChange {
1247 None,
1248 Register(RegionAttribute),
1249 Deregisters,
1250 Catchup,
1251 Ingest,
1252}
1253
1254fn is_metric_engine(engine: &str) -> bool {
1255 engine == METRIC_ENGINE_NAME
1256}
1257
1258fn parse_region_attribute(
1259 engine: &str,
1260 options: &HashMap<String, String>,
1261) -> Result<RegionAttribute> {
1262 match engine {
1263 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1264 METRIC_ENGINE_NAME => {
1265 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1266
1267 Ok(RegionAttribute::Metric { physical })
1268 }
1269 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1270 _ => error::UnexpectedSnafu {
1271 violated: format!("Unknown engine: {}", engine),
1272 }
1273 .fail(),
1274 }
1275}
1276
1277#[derive(Debug, Clone, Copy)]
1278enum RegionAttribute {
1279 Mito,
1280 Metric { physical: bool },
1281 File,
1282}
1283
1284impl RegionAttribute {
1285 fn engine(&self) -> &'static str {
1286 match self {
1287 RegionAttribute::Mito => MITO_ENGINE_NAME,
1288 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1289 RegionAttribute::File => FILE_ENGINE_NAME,
1290 }
1291 }
1292}
1293
1294#[cfg(test)]
1295mod tests {
1296
1297 use std::assert_matches::assert_matches;
1298
1299 use api::v1::SemanticType;
1300 use common_error::ext::ErrorExt;
1301 use datatypes::prelude::ConcreteDataType;
1302 use mito2::test_util::CreateRequestBuilder;
1303 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1304 use store_api::region_engine::RegionEngine;
1305 use store_api::region_request::{RegionDropRequest, RegionOpenRequest, RegionTruncateRequest};
1306 use store_api::storage::RegionId;
1307
1308 use super::*;
1309 use crate::error::Result;
1310 use crate::tests::{mock_region_server, MockRegionEngine};
1311
1312 #[tokio::test]
1313 async fn test_region_registering() {
1314 common_telemetry::init_default_ut_logging();
1315
1316 let mut mock_region_server = mock_region_server();
1317 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1318 let engine_name = engine.name();
1319 mock_region_server.register_engine(engine.clone());
1320 let region_id = RegionId::new(1, 1);
1321 let builder = CreateRequestBuilder::new();
1322 let create_req = builder.build();
1323 mock_region_server.inner.region_map.insert(
1325 region_id,
1326 RegionEngineWithStatus::Registering(engine.clone()),
1327 );
1328 let response = mock_region_server
1329 .handle_request(region_id, RegionRequest::Create(create_req))
1330 .await
1331 .unwrap();
1332 assert_eq!(response.affected_rows, 0);
1333 let status = mock_region_server
1334 .inner
1335 .region_map
1336 .get(®ion_id)
1337 .unwrap()
1338 .clone();
1339 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1340
1341 mock_region_server.inner.region_map.insert(
1342 region_id,
1343 RegionEngineWithStatus::Registering(engine.clone()),
1344 );
1345 let response = mock_region_server
1346 .handle_request(
1347 region_id,
1348 RegionRequest::Open(RegionOpenRequest {
1349 engine: engine_name.to_string(),
1350 region_dir: String::new(),
1351 options: Default::default(),
1352 skip_wal_replay: false,
1353 }),
1354 )
1355 .await
1356 .unwrap();
1357 assert_eq!(response.affected_rows, 0);
1358 let status = mock_region_server
1359 .inner
1360 .region_map
1361 .get(®ion_id)
1362 .unwrap()
1363 .clone();
1364 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1365 }
1366
1367 #[tokio::test]
1368 async fn test_region_deregistering() {
1369 common_telemetry::init_default_ut_logging();
1370
1371 let mut mock_region_server = mock_region_server();
1372 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1373
1374 mock_region_server.register_engine(engine.clone());
1375
1376 let region_id = RegionId::new(1, 1);
1377
1378 mock_region_server.inner.region_map.insert(
1380 region_id,
1381 RegionEngineWithStatus::Deregistering(engine.clone()),
1382 );
1383
1384 let response = mock_region_server
1385 .handle_request(
1386 region_id,
1387 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1388 )
1389 .await
1390 .unwrap();
1391 assert_eq!(response.affected_rows, 0);
1392
1393 let status = mock_region_server
1394 .inner
1395 .region_map
1396 .get(®ion_id)
1397 .unwrap()
1398 .clone();
1399 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1400
1401 mock_region_server.inner.region_map.insert(
1402 region_id,
1403 RegionEngineWithStatus::Deregistering(engine.clone()),
1404 );
1405
1406 let response = mock_region_server
1407 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1408 .await
1409 .unwrap();
1410 assert_eq!(response.affected_rows, 0);
1411
1412 let status = mock_region_server
1413 .inner
1414 .region_map
1415 .get(®ion_id)
1416 .unwrap()
1417 .clone();
1418 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1419 }
1420
1421 #[tokio::test]
1422 async fn test_region_not_ready() {
1423 common_telemetry::init_default_ut_logging();
1424
1425 let mut mock_region_server = mock_region_server();
1426 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1427
1428 mock_region_server.register_engine(engine.clone());
1429
1430 let region_id = RegionId::new(1, 1);
1431
1432 mock_region_server.inner.region_map.insert(
1434 region_id,
1435 RegionEngineWithStatus::Registering(engine.clone()),
1436 );
1437
1438 let err = mock_region_server
1439 .handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
1440 .await
1441 .unwrap_err();
1442
1443 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1444 }
1445
1446 #[tokio::test]
1447 async fn test_region_request_failed() {
1448 common_telemetry::init_default_ut_logging();
1449
1450 let mut mock_region_server = mock_region_server();
1451 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1452 MITO_ENGINE_NAME,
1453 Box::new(|_region_id, _request| {
1454 error::UnexpectedSnafu {
1455 violated: "test".to_string(),
1456 }
1457 .fail()
1458 }),
1459 );
1460
1461 mock_region_server.register_engine(engine.clone());
1462
1463 let region_id = RegionId::new(1, 1);
1464 let builder = CreateRequestBuilder::new();
1465 let create_req = builder.build();
1466 mock_region_server
1467 .handle_request(region_id, RegionRequest::Create(create_req))
1468 .await
1469 .unwrap_err();
1470
1471 let status = mock_region_server.inner.region_map.get(®ion_id);
1472 assert!(status.is_none());
1473
1474 mock_region_server
1475 .inner
1476 .region_map
1477 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1478
1479 mock_region_server
1480 .handle_request(
1481 region_id,
1482 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1483 )
1484 .await
1485 .unwrap_err();
1486
1487 let status = mock_region_server.inner.region_map.get(®ion_id);
1488 assert!(status.is_some());
1489 }
1490
1491 struct CurrentEngineTest {
1492 region_id: RegionId,
1493 current_region_status: Option<RegionEngineWithStatus>,
1494 region_change: RegionChange,
1495 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1496 }
1497
1498 #[tokio::test]
1499 async fn test_current_engine() {
1500 common_telemetry::init_default_ut_logging();
1501
1502 let mut mock_region_server = mock_region_server();
1503 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1504 mock_region_server.register_engine(engine.clone());
1505
1506 let region_id = RegionId::new(1024, 1);
1507 let tests = vec![
1508 CurrentEngineTest {
1510 region_id,
1511 current_region_status: None,
1512 region_change: RegionChange::None,
1513 assert: Box::new(|result| {
1514 let err = result.unwrap_err();
1515 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1516 }),
1517 },
1518 CurrentEngineTest {
1519 region_id,
1520 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1521 region_change: RegionChange::None,
1522 assert: Box::new(|result| {
1523 let current_engine = result.unwrap();
1524 assert_matches!(current_engine, CurrentEngine::Engine(_));
1525 }),
1526 },
1527 CurrentEngineTest {
1528 region_id,
1529 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1530 region_change: RegionChange::None,
1531 assert: Box::new(|result| {
1532 let err = result.unwrap_err();
1533 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1534 }),
1535 },
1536 CurrentEngineTest {
1537 region_id,
1538 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1539 region_change: RegionChange::None,
1540 assert: Box::new(|result| {
1541 let err = result.unwrap_err();
1542 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1543 }),
1544 },
1545 CurrentEngineTest {
1547 region_id,
1548 current_region_status: None,
1549 region_change: RegionChange::Register(RegionAttribute::Mito),
1550 assert: Box::new(|result| {
1551 let current_engine = result.unwrap();
1552 assert_matches!(current_engine, CurrentEngine::Engine(_));
1553 }),
1554 },
1555 CurrentEngineTest {
1556 region_id,
1557 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1558 region_change: RegionChange::Register(RegionAttribute::Mito),
1559 assert: Box::new(|result| {
1560 let current_engine = result.unwrap();
1561 assert_matches!(current_engine, CurrentEngine::Engine(_));
1562 }),
1563 },
1564 CurrentEngineTest {
1565 region_id,
1566 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1567 region_change: RegionChange::Register(RegionAttribute::Mito),
1568 assert: Box::new(|result| {
1569 let err = result.unwrap_err();
1570 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1571 }),
1572 },
1573 CurrentEngineTest {
1574 region_id,
1575 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1576 region_change: RegionChange::Register(RegionAttribute::Mito),
1577 assert: Box::new(|result| {
1578 let current_engine = result.unwrap();
1579 assert_matches!(current_engine, CurrentEngine::Engine(_));
1580 }),
1581 },
1582 CurrentEngineTest {
1584 region_id,
1585 current_region_status: None,
1586 region_change: RegionChange::Deregisters,
1587 assert: Box::new(|result| {
1588 let current_engine = result.unwrap();
1589 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1590 }),
1591 },
1592 CurrentEngineTest {
1593 region_id,
1594 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1595 region_change: RegionChange::Deregisters,
1596 assert: Box::new(|result| {
1597 let err = result.unwrap_err();
1598 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1599 }),
1600 },
1601 CurrentEngineTest {
1602 region_id,
1603 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1604 region_change: RegionChange::Deregisters,
1605 assert: Box::new(|result| {
1606 let current_engine = result.unwrap();
1607 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1608 }),
1609 },
1610 CurrentEngineTest {
1611 region_id,
1612 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1613 region_change: RegionChange::Deregisters,
1614 assert: Box::new(|result| {
1615 let current_engine = result.unwrap();
1616 assert_matches!(current_engine, CurrentEngine::Engine(_));
1617 }),
1618 },
1619 ];
1620
1621 for test in tests {
1622 let CurrentEngineTest {
1623 region_id,
1624 current_region_status,
1625 region_change,
1626 assert,
1627 } = test;
1628
1629 if let Some(status) = current_region_status {
1631 mock_region_server
1632 .inner
1633 .region_map
1634 .insert(region_id, status);
1635 } else {
1636 mock_region_server.inner.region_map.remove(®ion_id);
1637 }
1638
1639 let result = mock_region_server
1640 .inner
1641 .get_engine(region_id, ®ion_change);
1642
1643 assert(result);
1644 }
1645 }
1646
1647 #[tokio::test]
1648 async fn test_region_server_parallelism() {
1649 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1650 let first_query = p.acquire().await;
1651 assert!(first_query.is_ok());
1652 let second_query = p.acquire().await;
1653 assert!(second_query.is_ok());
1654 let third_query = p.acquire().await;
1655 assert!(third_query.is_err());
1656 let err = third_query.unwrap_err();
1657 assert_eq!(
1658 err.output_msg(),
1659 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1660 );
1661 drop(first_query);
1662 let forth_query = p.acquire().await;
1663 assert!(forth_query.is_ok());
1664 }
1665
1666 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1667 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1668 metadata_builder.push_column_metadata(ColumnMetadata {
1669 column_schema: datatypes::schema::ColumnSchema::new(
1670 "timestamp",
1671 ConcreteDataType::timestamp_nanosecond_datatype(),
1672 false,
1673 ),
1674 semantic_type: SemanticType::Timestamp,
1675 column_id: 0,
1676 });
1677 metadata_builder.push_column_metadata(ColumnMetadata {
1678 column_schema: datatypes::schema::ColumnSchema::new(
1679 "file",
1680 ConcreteDataType::string_datatype(),
1681 true,
1682 ),
1683 semantic_type: SemanticType::Tag,
1684 column_id: 1,
1685 });
1686 metadata_builder.push_column_metadata(ColumnMetadata {
1687 column_schema: datatypes::schema::ColumnSchema::new(
1688 "message",
1689 ConcreteDataType::string_datatype(),
1690 true,
1691 ),
1692 semantic_type: SemanticType::Field,
1693 column_id: 2,
1694 });
1695 metadata_builder.primary_key(vec![1]);
1696 metadata_builder.build().unwrap()
1697 }
1698
1699 #[tokio::test]
1700 async fn test_handle_list_metadata_request() {
1701 common_telemetry::init_default_ut_logging();
1702
1703 let mut mock_region_server = mock_region_server();
1704 let region_id_1 = RegionId::new(1, 0);
1705 let region_id_2 = RegionId::new(2, 0);
1706
1707 let metadata_1 = mock_region_metadata(region_id_1);
1708 let metadata_2 = mock_region_metadata(region_id_2);
1709 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
1710
1711 let metadata_1 = Arc::new(metadata_1);
1712 let metadata_2 = Arc::new(metadata_2);
1713 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1714 MITO_ENGINE_NAME,
1715 Box::new(move |region_id| {
1716 if region_id == region_id_1 {
1717 Ok(metadata_1.clone())
1718 } else if region_id == region_id_2 {
1719 Ok(metadata_2.clone())
1720 } else {
1721 error::RegionNotFoundSnafu { region_id }.fail()
1722 }
1723 }),
1724 );
1725
1726 mock_region_server.register_engine(engine.clone());
1727 mock_region_server
1728 .inner
1729 .region_map
1730 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1731 mock_region_server
1732 .inner
1733 .region_map
1734 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1735
1736 let list_metadata_request = ListMetadataRequest {
1738 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1739 };
1740 let response = mock_region_server
1741 .handle_list_metadata_request(&list_metadata_request)
1742 .await
1743 .unwrap();
1744 let decoded_metadata: Vec<Option<RegionMetadata>> =
1745 serde_json::from_slice(&response.metadata).unwrap();
1746 assert_eq!(metadatas, decoded_metadata);
1747 }
1748
1749 #[tokio::test]
1750 async fn test_handle_list_metadata_not_found() {
1751 common_telemetry::init_default_ut_logging();
1752
1753 let mut mock_region_server = mock_region_server();
1754 let region_id_1 = RegionId::new(1, 0);
1755 let region_id_2 = RegionId::new(2, 0);
1756
1757 let metadata_1 = mock_region_metadata(region_id_1);
1758 let metadatas = vec![Some(metadata_1.clone()), None];
1759
1760 let metadata_1 = Arc::new(metadata_1);
1761 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1762 MITO_ENGINE_NAME,
1763 Box::new(move |region_id| {
1764 if region_id == region_id_1 {
1765 Ok(metadata_1.clone())
1766 } else {
1767 error::RegionNotFoundSnafu { region_id }.fail()
1768 }
1769 }),
1770 );
1771
1772 mock_region_server.register_engine(engine.clone());
1773 mock_region_server
1774 .inner
1775 .region_map
1776 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1777
1778 let list_metadata_request = ListMetadataRequest {
1780 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1781 };
1782 let response = mock_region_server
1783 .handle_list_metadata_request(&list_metadata_request)
1784 .await
1785 .unwrap();
1786 let decoded_metadata: Vec<Option<RegionMetadata>> =
1787 serde_json::from_slice(&response.metadata).unwrap();
1788 assert_eq!(metadatas, decoded_metadata);
1789
1790 mock_region_server
1792 .inner
1793 .region_map
1794 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1795 let response = mock_region_server
1796 .handle_list_metadata_request(&list_metadata_request)
1797 .await
1798 .unwrap();
1799 let decoded_metadata: Vec<Option<RegionMetadata>> =
1800 serde_json::from_slice(&response.metadata).unwrap();
1801 assert_eq!(metadatas, decoded_metadata);
1802 }
1803
1804 #[tokio::test]
1805 async fn test_handle_list_metadata_failed() {
1806 common_telemetry::init_default_ut_logging();
1807
1808 let mut mock_region_server = mock_region_server();
1809 let region_id_1 = RegionId::new(1, 0);
1810
1811 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1812 MITO_ENGINE_NAME,
1813 Box::new(move |region_id| {
1814 error::UnexpectedSnafu {
1815 violated: format!("Failed to get region {region_id}"),
1816 }
1817 .fail()
1818 }),
1819 );
1820
1821 mock_region_server.register_engine(engine.clone());
1822 mock_region_server
1823 .inner
1824 .region_map
1825 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1826
1827 let list_metadata_request = ListMetadataRequest {
1829 region_ids: vec![region_id_1.as_u64()],
1830 };
1831 mock_region_server
1832 .handle_list_metadata_request(&list_metadata_request)
1833 .await
1834 .unwrap_err();
1835 }
1836}