1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::ops::Deref;
18use std::sync::{Arc, RwLock};
19use std::time::Duration;
20
21use api::region::RegionResponse;
22use api::v1::meta::TopicStat;
23use api::v1::region::sync_request::ManifestInfo;
24use api::v1::region::{
25 region_request, ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest,
26};
27use api::v1::{ResponseHeader, Status};
28use arrow_flight::{FlightData, Ticket};
29use async_trait::async_trait;
30use bytes::Bytes;
31use common_error::ext::{BoxedError, ErrorExt};
32use common_error::status_code::StatusCode;
33use common_meta::datanode::TopicStatsReporter;
34use common_query::request::QueryRequest;
35use common_query::OutputData;
36use common_recordbatch::SendableRecordBatchStream;
37use common_runtime::Runtime;
38use common_telemetry::tracing::{self, info_span};
39use common_telemetry::tracing_context::{FutureExt, TracingContext};
40use common_telemetry::{debug, error, info, warn};
41use dashmap::DashMap;
42use datafusion::datasource::{provider_as_source, TableProvider};
43use datafusion::error::Result as DfResult;
44use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
45use datafusion_expr::{LogicalPlan, TableSource};
46use futures_util::future::try_join_all;
47use metric_engine::engine::MetricEngine;
48use mito2::engine::MITO_ENGINE_NAME;
49use prost::Message;
50pub use query::dummy_catalog::{
51 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
52};
53use query::QueryEngineRef;
54use serde_json;
55use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
56use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
57use servers::grpc::region_server::RegionServerHandler;
58use servers::grpc::FlightCompression;
59use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
60use snafu::{ensure, OptionExt, ResultExt};
61use store_api::metric_engine_consts::{
62 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
63};
64use store_api::region_engine::{
65 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
66 SettableRegionRoleState,
67};
68use store_api::region_request::{
69 AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
70};
71use store_api::storage::RegionId;
72use tokio::sync::{Semaphore, SemaphorePermit};
73use tokio::time::timeout;
74use tonic::{Request, Response, Result as TonicResult};
75
76use crate::error::{
77 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
78 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
79 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
80 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
81 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
82 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
83};
84use crate::event_listener::RegionServerEventListenerRef;
85
86#[derive(Clone)]
87pub struct RegionServer {
88 inner: Arc<RegionServerInner>,
89 flight_compression: FlightCompression,
90}
91
92pub struct RegionStat {
93 pub region_id: RegionId,
94 pub engine: String,
95 pub role: RegionRole,
96}
97
98impl RegionServer {
99 pub fn new(
100 query_engine: QueryEngineRef,
101 runtime: Runtime,
102 event_listener: RegionServerEventListenerRef,
103 flight_compression: FlightCompression,
104 ) -> Self {
105 Self::with_table_provider(
106 query_engine,
107 runtime,
108 event_listener,
109 Arc::new(DummyTableProviderFactory),
110 0,
111 Duration::from_millis(0),
112 flight_compression,
113 )
114 }
115
116 pub fn with_table_provider(
117 query_engine: QueryEngineRef,
118 runtime: Runtime,
119 event_listener: RegionServerEventListenerRef,
120 table_provider_factory: TableProviderFactoryRef,
121 max_concurrent_queries: usize,
122 concurrent_query_limiter_timeout: Duration,
123 flight_compression: FlightCompression,
124 ) -> Self {
125 Self {
126 inner: Arc::new(RegionServerInner::new(
127 query_engine,
128 runtime,
129 event_listener,
130 table_provider_factory,
131 RegionServerParallelism::from_opts(
132 max_concurrent_queries,
133 concurrent_query_limiter_timeout,
134 ),
135 )),
136 flight_compression,
137 }
138 }
139
140 pub fn register_engine(&mut self, engine: RegionEngineRef) {
142 self.inner.register_engine(engine);
143 }
144
145 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
147 self.inner.set_topic_stats_reporter(topic_stats_reporter);
148 }
149
150 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
152 match self.inner.get_engine(region_id, &RegionChange::None) {
153 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
154 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
155 Err(error::Error::RegionNotFound { .. }) => Ok(None),
156 Err(err) => Err(err),
157 }
158 }
159
160 #[tracing::instrument(skip_all)]
161 pub async fn handle_batch_open_requests(
162 &self,
163 parallelism: usize,
164 requests: Vec<(RegionId, RegionOpenRequest)>,
165 ignore_nonexistent_region: bool,
166 ) -> Result<Vec<RegionId>> {
167 self.inner
168 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
169 .await
170 }
171
172 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
173 pub async fn handle_request(
174 &self,
175 region_id: RegionId,
176 request: RegionRequest,
177 ) -> Result<RegionResponse> {
178 self.inner.handle_request(region_id, request).await
179 }
180
181 async fn table_provider(
183 &self,
184 region_id: RegionId,
185 ctx: Option<QueryContextRef>,
186 ) -> Result<Arc<dyn TableProvider>> {
187 let status = self
188 .inner
189 .region_map
190 .get(®ion_id)
191 .context(RegionNotFoundSnafu { region_id })?
192 .clone();
193 ensure!(
194 matches!(status, RegionEngineWithStatus::Ready(_)),
195 RegionNotReadySnafu { region_id }
196 );
197
198 self.inner
199 .table_provider_factory
200 .create(region_id, status.into_engine(), ctx)
201 .await
202 .context(ExecuteLogicalPlanSnafu)
203 }
204
205 pub async fn handle_remote_read(
207 &self,
208 request: api::v1::region::QueryRequest,
209 query_ctx: QueryContextRef,
210 ) -> Result<SendableRecordBatchStream> {
211 let _permit = if let Some(p) = &self.inner.parallelism {
212 Some(p.acquire().await?)
213 } else {
214 None
215 };
216
217 let region_id = RegionId::from_u64(request.region_id);
218 let provider = self
219 .table_provider(region_id, Some(query_ctx.clone()))
220 .await?;
221 let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
222
223 if query_ctx.explain_verbose() {
224 common_telemetry::info!("Handle remote read for region: {}", region_id);
225 }
226
227 let decoder = self
228 .inner
229 .query_engine
230 .engine_context(query_ctx.clone())
231 .new_plan_decoder()
232 .context(NewPlanDecoderSnafu)?;
233
234 let plan = decoder
235 .decode(Bytes::from(request.plan), catalog_list, false)
236 .await
237 .context(DecodeLogicalPlanSnafu)?;
238
239 self.inner
240 .handle_read(
241 QueryRequest {
242 header: request.header,
243 region_id,
244 plan,
245 },
246 query_ctx,
247 )
248 .await
249 }
250
251 #[tracing::instrument(skip_all)]
252 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
253 let _permit = if let Some(p) = &self.inner.parallelism {
254 Some(p.acquire().await?)
255 } else {
256 None
257 };
258
259 let ctx = request.header.as_ref().map(|h| h.into());
260 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
261
262 let provider = self
263 .table_provider(request.region_id, Some(query_ctx.clone()))
264 .await?;
265
266 struct RegionDataSourceInjector {
267 source: Arc<dyn TableSource>,
268 }
269
270 impl TreeNodeRewriter for RegionDataSourceInjector {
271 type Node = LogicalPlan;
272
273 fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
274 Ok(match node {
275 LogicalPlan::TableScan(mut scan) => {
276 scan.source = self.source.clone();
277 Transformed::yes(LogicalPlan::TableScan(scan))
278 }
279 _ => Transformed::no(node),
280 })
281 }
282 }
283
284 let plan = request
285 .plan
286 .rewrite(&mut RegionDataSourceInjector {
287 source: provider_as_source(provider),
288 })
289 .context(DataFusionSnafu)?
290 .data;
291
292 self.inner
293 .handle_read(QueryRequest { plan, ..request }, query_ctx)
294 .await
295 }
296
297 pub fn reportable_regions(&self) -> Vec<RegionStat> {
301 self.inner
302 .region_map
303 .iter()
304 .filter_map(|e| {
305 let region_id = *e.key();
306 e.role(region_id).map(|role| RegionStat {
308 region_id,
309 engine: e.value().name().to_string(),
310 role,
311 })
312 })
313 .collect()
314 }
315
316 pub fn topic_stats(&self) -> Vec<TopicStat> {
318 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
319 let Some(reporter) = reporter.as_mut() else {
320 return vec![];
321 };
322 reporter
323 .reportable_topics()
324 .into_iter()
325 .map(|stat| TopicStat {
326 topic_name: stat.topic,
327 record_size: stat.record_size,
328 record_num: stat.record_num,
329 latest_entry_id: stat.latest_entry_id,
330 })
331 .collect()
332 }
333
334 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
335 self.inner.region_map.get(®ion_id).and_then(|engine| {
336 engine.role(region_id).map(|role| match role {
337 RegionRole::Follower => false,
338 RegionRole::Leader => true,
339 RegionRole::DowngradingLeader => true,
340 })
341 })
342 }
343
344 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
345 let engine = self
346 .inner
347 .region_map
348 .get(®ion_id)
349 .with_context(|| RegionNotFoundSnafu { region_id })?;
350 engine
351 .set_region_role(region_id, role)
352 .with_context(|_| HandleRegionRequestSnafu { region_id })
353 }
354
355 pub async fn set_region_role_state_gracefully(
365 &self,
366 region_id: RegionId,
367 state: SettableRegionRoleState,
368 ) -> Result<SetRegionRoleStateResponse> {
369 match self.inner.region_map.get(®ion_id) {
370 Some(engine) => Ok(engine
371 .set_region_role_state_gracefully(region_id, state)
372 .await
373 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
374 None => Ok(SetRegionRoleStateResponse::NotFound),
375 }
376 }
377
378 pub fn runtime(&self) -> Runtime {
379 self.inner.runtime.clone()
380 }
381
382 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
383 match self.inner.region_map.get(®ion_id) {
384 Some(e) => e.region_statistic(region_id),
385 None => None,
386 }
387 }
388
389 pub async fn stop(&self) -> Result<()> {
391 self.inner.stop().await
392 }
393
394 #[cfg(test)]
395 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
397 self.inner
398 .region_map
399 .insert(region_id, RegionEngineWithStatus::Ready(engine));
400 }
401
402 async fn handle_batch_ddl_requests(
403 &self,
404 request: region_request::Body,
405 ) -> Result<RegionResponse> {
406 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
408 .context(BuildRegionRequestsSnafu)?
409 .unwrap();
410 let tracing_context = TracingContext::from_current_span();
411
412 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
413 self.inner
414 .handle_batch_request(batch_request)
415 .trace(span)
416 .await
417 }
418
419 async fn handle_requests_in_parallel(
420 &self,
421 request: region_request::Body,
422 ) -> Result<RegionResponse> {
423 let requests =
424 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
425 let tracing_context = TracingContext::from_current_span();
426
427 let join_tasks = requests.into_iter().map(|(region_id, req)| {
428 let self_to_move = self;
429 let span = tracing_context.attach(info_span!(
430 "RegionServer::handle_region_request",
431 region_id = region_id.to_string()
432 ));
433 async move {
434 self_to_move
435 .handle_request(region_id, req)
436 .trace(span)
437 .await
438 }
439 });
440
441 let results = try_join_all(join_tasks).await?;
442 let mut affected_rows = 0;
443 let mut extensions = HashMap::new();
444 for result in results {
445 affected_rows += result.affected_rows;
446 extensions.extend(result.extensions);
447 }
448
449 Ok(RegionResponse {
450 affected_rows,
451 extensions,
452 metadata: Vec::new(),
453 })
454 }
455
456 async fn handle_requests_in_serial(
457 &self,
458 request: region_request::Body,
459 ) -> Result<RegionResponse> {
460 let requests =
461 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
462 let tracing_context = TracingContext::from_current_span();
463
464 let mut affected_rows = 0;
465 let mut extensions = HashMap::new();
466 for (region_id, req) in requests {
469 let span = tracing_context.attach(info_span!(
470 "RegionServer::handle_region_request",
471 region_id = region_id.to_string()
472 ));
473 let result = self.handle_request(region_id, req).trace(span).await?;
474
475 affected_rows += result.affected_rows;
476 extensions.extend(result.extensions);
477 }
478
479 Ok(RegionResponse {
480 affected_rows,
481 extensions,
482 metadata: Vec::new(),
483 })
484 }
485
486 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
487 let region_id = RegionId::from_u64(request.region_id);
488 let manifest_info = request
489 .manifest_info
490 .context(error::MissingRequiredFieldSnafu {
491 name: "manifest_info",
492 })?;
493
494 let manifest_info = match manifest_info {
495 ManifestInfo::MitoManifestInfo(info) => {
496 RegionManifestInfo::mito(info.data_manifest_version, 0)
497 }
498 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
499 info.data_manifest_version,
500 0,
501 info.metadata_manifest_version,
502 0,
503 ),
504 };
505
506 let tracing_context = TracingContext::from_current_span();
507 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
508
509 self.sync_region(region_id, manifest_info)
510 .trace(span)
511 .await
512 .map(|_| RegionResponse::new(AffectedRows::default()))
513 }
514
515 #[tracing::instrument(skip_all)]
520 async fn handle_list_metadata_request(
521 &self,
522 request: &ListMetadataRequest,
523 ) -> Result<RegionResponse> {
524 let mut region_metadatas = Vec::new();
525 for region_id in &request.region_ids {
527 let region_id = RegionId::from_u64(*region_id);
528 let Some(engine) = self.find_engine(region_id)? else {
530 region_metadatas.push(None);
531 continue;
532 };
533
534 match engine.get_metadata(region_id).await {
535 Ok(metadata) => region_metadatas.push(Some(metadata)),
536 Err(err) => {
537 if err.status_code() == StatusCode::RegionNotFound {
538 region_metadatas.push(None);
539 } else {
540 Err(err).with_context(|_| GetRegionMetadataSnafu {
541 engine: engine.name(),
542 region_id,
543 })?;
544 }
545 }
546 }
547 }
548
549 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
551
552 let response = RegionResponse::from_metadata(json_result);
553
554 Ok(response)
555 }
556
557 pub async fn sync_region(
559 &self,
560 region_id: RegionId,
561 manifest_info: RegionManifestInfo,
562 ) -> Result<()> {
563 let engine_with_status = self
564 .inner
565 .region_map
566 .get(®ion_id)
567 .with_context(|| RegionNotFoundSnafu { region_id })?;
568
569 self.inner
570 .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
571 .await
572 }
573}
574
575#[async_trait]
576impl RegionServerHandler for RegionServer {
577 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
578 let response = match &request {
579 region_request::Body::Creates(_)
580 | region_request::Body::Drops(_)
581 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
582 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
583 self.handle_requests_in_parallel(request).await
584 }
585 region_request::Body::Sync(sync_request) => {
586 self.handle_sync_region_request(sync_request).await
587 }
588 region_request::Body::ListMetadata(list_metadata_request) => {
589 self.handle_list_metadata_request(list_metadata_request)
590 .await
591 }
592 _ => self.handle_requests_in_serial(request).await,
593 }
594 .map_err(BoxedError::new)
595 .context(ExecuteGrpcRequestSnafu)?;
596
597 Ok(RegionResponseV1 {
598 header: Some(ResponseHeader {
599 status: Some(Status {
600 status_code: StatusCode::Success as _,
601 ..Default::default()
602 }),
603 }),
604 affected_rows: response.affected_rows as _,
605 extensions: response.extensions,
606 metadata: response.metadata,
607 })
608 }
609}
610
611#[async_trait]
612impl FlightCraft for RegionServer {
613 async fn do_get(
614 &self,
615 request: Request<Ticket>,
616 ) -> TonicResult<Response<TonicStream<FlightData>>> {
617 let ticket = request.into_inner().ticket;
618 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
619 .context(servers_error::InvalidFlightTicketSnafu)?;
620 let tracing_context = request
621 .header
622 .as_ref()
623 .map(|h| TracingContext::from_w3c(&h.tracing_context))
624 .unwrap_or_default();
625 let query_ctx = request
626 .header
627 .as_ref()
628 .map(|h| Arc::new(QueryContext::from(h)))
629 .unwrap_or(QueryContext::arc());
630
631 let result = self
632 .handle_remote_read(request, query_ctx.clone())
633 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
634 .await?;
635
636 let stream = Box::pin(FlightRecordBatchStream::new(
637 result,
638 tracing_context,
639 self.flight_compression,
640 query_ctx,
641 ));
642 Ok(Response::new(stream))
643 }
644}
645
646#[derive(Clone)]
647enum RegionEngineWithStatus {
648 Registering(RegionEngineRef),
650 Deregistering(RegionEngineRef),
652 Ready(RegionEngineRef),
654}
655
656impl RegionEngineWithStatus {
657 pub fn into_engine(self) -> RegionEngineRef {
659 match self {
660 RegionEngineWithStatus::Registering(engine) => engine,
661 RegionEngineWithStatus::Deregistering(engine) => engine,
662 RegionEngineWithStatus::Ready(engine) => engine,
663 }
664 }
665
666 pub fn engine(&self) -> &RegionEngineRef {
668 match self {
669 RegionEngineWithStatus::Registering(engine) => engine,
670 RegionEngineWithStatus::Deregistering(engine) => engine,
671 RegionEngineWithStatus::Ready(engine) => engine,
672 }
673 }
674}
675
676impl Deref for RegionEngineWithStatus {
677 type Target = RegionEngineRef;
678
679 fn deref(&self) -> &Self::Target {
680 match self {
681 RegionEngineWithStatus::Registering(engine) => engine,
682 RegionEngineWithStatus::Deregistering(engine) => engine,
683 RegionEngineWithStatus::Ready(engine) => engine,
684 }
685 }
686}
687
688struct RegionServerInner {
689 engines: RwLock<HashMap<String, RegionEngineRef>>,
690 region_map: DashMap<RegionId, RegionEngineWithStatus>,
691 query_engine: QueryEngineRef,
692 runtime: Runtime,
693 event_listener: RegionServerEventListenerRef,
694 table_provider_factory: TableProviderFactoryRef,
695 parallelism: Option<RegionServerParallelism>,
698 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
700}
701
702struct RegionServerParallelism {
703 semaphore: Semaphore,
704 timeout: Duration,
705}
706
707impl RegionServerParallelism {
708 pub fn from_opts(
709 max_concurrent_queries: usize,
710 concurrent_query_limiter_timeout: Duration,
711 ) -> Option<Self> {
712 if max_concurrent_queries == 0 {
713 return None;
714 }
715 Some(RegionServerParallelism {
716 semaphore: Semaphore::new(max_concurrent_queries),
717 timeout: concurrent_query_limiter_timeout,
718 })
719 }
720
721 pub async fn acquire(&self) -> Result<SemaphorePermit> {
722 timeout(self.timeout, self.semaphore.acquire())
723 .await
724 .context(ConcurrentQueryLimiterTimeoutSnafu)?
725 .context(ConcurrentQueryLimiterClosedSnafu)
726 }
727}
728
729enum CurrentEngine {
730 Engine(RegionEngineRef),
731 EarlyReturn(AffectedRows),
732}
733
734impl Debug for CurrentEngine {
735 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
736 match self {
737 CurrentEngine::Engine(engine) => f
738 .debug_struct("CurrentEngine")
739 .field("engine", &engine.name())
740 .finish(),
741 CurrentEngine::EarlyReturn(rows) => f
742 .debug_struct("CurrentEngine")
743 .field("return", rows)
744 .finish(),
745 }
746 }
747}
748
749impl RegionServerInner {
750 pub fn new(
751 query_engine: QueryEngineRef,
752 runtime: Runtime,
753 event_listener: RegionServerEventListenerRef,
754 table_provider_factory: TableProviderFactoryRef,
755 parallelism: Option<RegionServerParallelism>,
756 ) -> Self {
757 Self {
758 engines: RwLock::new(HashMap::new()),
759 region_map: DashMap::new(),
760 query_engine,
761 runtime,
762 event_listener,
763 table_provider_factory,
764 parallelism,
765 topic_stats_reporter: RwLock::new(None),
766 }
767 }
768
769 pub fn register_engine(&self, engine: RegionEngineRef) {
770 let engine_name = engine.name();
771 info!("Region Engine {engine_name} is registered");
772 self.engines
773 .write()
774 .unwrap()
775 .insert(engine_name.to_string(), engine);
776 }
777
778 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
779 info!("Set topic stats reporter");
780 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
781 }
782
783 fn get_engine(
784 &self,
785 region_id: RegionId,
786 region_change: &RegionChange,
787 ) -> Result<CurrentEngine> {
788 let current_region_status = self.region_map.get(®ion_id);
789
790 let engine = match region_change {
791 RegionChange::Register(attribute) => match current_region_status {
792 Some(status) => match status.clone() {
793 RegionEngineWithStatus::Registering(engine) => engine,
794 RegionEngineWithStatus::Deregistering(_) => {
795 return error::RegionBusySnafu { region_id }.fail()
796 }
797 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
798 },
799 _ => self
800 .engines
801 .read()
802 .unwrap()
803 .get(attribute.engine())
804 .with_context(|| RegionEngineNotFoundSnafu {
805 name: attribute.engine(),
806 })?
807 .clone(),
808 },
809 RegionChange::Deregisters => match current_region_status {
810 Some(status) => match status.clone() {
811 RegionEngineWithStatus::Registering(_) => {
812 return error::RegionBusySnafu { region_id }.fail()
813 }
814 RegionEngineWithStatus::Deregistering(_) => {
815 return Ok(CurrentEngine::EarlyReturn(0))
816 }
817 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
818 },
819 None => return Ok(CurrentEngine::EarlyReturn(0)),
820 },
821 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
822 match current_region_status {
823 Some(status) => match status.clone() {
824 RegionEngineWithStatus::Registering(_) => {
825 return error::RegionNotReadySnafu { region_id }.fail()
826 }
827 RegionEngineWithStatus::Deregistering(_) => {
828 return error::RegionNotFoundSnafu { region_id }.fail()
829 }
830 RegionEngineWithStatus::Ready(engine) => engine,
831 },
832 None => return error::RegionNotFoundSnafu { region_id }.fail(),
833 }
834 }
835 };
836
837 Ok(CurrentEngine::Engine(engine))
838 }
839
840 async fn handle_batch_open_requests_inner(
841 &self,
842 engine: RegionEngineRef,
843 parallelism: usize,
844 requests: Vec<(RegionId, RegionOpenRequest)>,
845 ignore_nonexistent_region: bool,
846 ) -> Result<Vec<RegionId>> {
847 let region_changes = requests
848 .iter()
849 .map(|(region_id, open)| {
850 let attribute = parse_region_attribute(&open.engine, &open.options)?;
851 Ok((*region_id, RegionChange::Register(attribute)))
852 })
853 .collect::<Result<HashMap<_, _>>>()?;
854
855 for (®ion_id, region_change) in ®ion_changes {
856 self.set_region_status_not_ready(region_id, &engine, region_change)
857 }
858
859 let mut open_regions = Vec::with_capacity(requests.len());
860 let mut errors = vec![];
861 match engine
862 .handle_batch_open_requests(parallelism, requests)
863 .await
864 .with_context(|_| HandleBatchOpenRequestSnafu)
865 {
866 Ok(results) => {
867 for (region_id, result) in results {
868 let region_change = ®ion_changes[®ion_id];
869 match result {
870 Ok(_) => {
871 if let Err(e) = self
872 .set_region_status_ready(region_id, engine.clone(), *region_change)
873 .await
874 {
875 error!(e; "Failed to set region to ready: {}", region_id);
876 errors.push(BoxedError::new(e));
877 } else {
878 open_regions.push(region_id)
879 }
880 }
881 Err(e) => {
882 self.unset_region_status(region_id, &engine, *region_change);
883 if e.status_code() == StatusCode::RegionNotFound
884 && ignore_nonexistent_region
885 {
886 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
887 } else {
888 error!(e; "Failed to open region: {}", region_id);
889 errors.push(e);
890 }
891 }
892 }
893 }
894 }
895 Err(e) => {
896 for (®ion_id, region_change) in ®ion_changes {
897 self.unset_region_status(region_id, &engine, *region_change);
898 }
899 error!(e; "Failed to open batch regions");
900 errors.push(BoxedError::new(e));
901 }
902 }
903
904 if !errors.is_empty() {
905 return error::UnexpectedSnafu {
906 violated: format!("Failed to open batch regions: {:?}", errors[0]),
908 }
909 .fail();
910 }
911
912 Ok(open_regions)
913 }
914
915 pub async fn handle_batch_open_requests(
916 &self,
917 parallelism: usize,
918 requests: Vec<(RegionId, RegionOpenRequest)>,
919 ignore_nonexistent_region: bool,
920 ) -> Result<Vec<RegionId>> {
921 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
922 HashMap::with_capacity(requests.len());
923 for (region_id, request) in requests {
924 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
925 requests.push((region_id, request));
926 } else {
927 engine_grouped_requests
928 .insert(request.engine.to_string(), vec![(region_id, request)]);
929 }
930 }
931
932 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
933 for (engine, requests) in engine_grouped_requests {
934 let engine = self
935 .engines
936 .read()
937 .unwrap()
938 .get(&engine)
939 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
940 .clone();
941 results.push(
942 self.handle_batch_open_requests_inner(
943 engine,
944 parallelism,
945 requests,
946 ignore_nonexistent_region,
947 )
948 .await,
949 )
950 }
951
952 Ok(results
953 .into_iter()
954 .collect::<Result<Vec<_>>>()?
955 .into_iter()
956 .flatten()
957 .collect::<Vec<_>>())
958 }
959
960 pub async fn handle_batch_request(
964 &self,
965 batch_request: BatchRegionDdlRequest,
966 ) -> Result<RegionResponse> {
967 let region_changes = match &batch_request {
968 BatchRegionDdlRequest::Create(requests) => requests
969 .iter()
970 .map(|(region_id, create)| {
971 let attribute = parse_region_attribute(&create.engine, &create.options)?;
972 Ok((*region_id, RegionChange::Register(attribute)))
973 })
974 .collect::<Result<Vec<_>>>()?,
975 BatchRegionDdlRequest::Drop(requests) => requests
976 .iter()
977 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
978 .collect::<Vec<_>>(),
979 BatchRegionDdlRequest::Alter(requests) => requests
980 .iter()
981 .map(|(region_id, _)| (*region_id, RegionChange::None))
982 .collect::<Vec<_>>(),
983 };
984
985 let (first_region_id, first_region_change) = region_changes.first().unwrap();
988 let engine = match self.get_engine(*first_region_id, first_region_change)? {
989 CurrentEngine::Engine(engine) => engine,
990 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
991 };
992
993 for (region_id, region_change) in region_changes.iter() {
994 self.set_region_status_not_ready(*region_id, &engine, region_change);
995 }
996
997 let ddl_type = batch_request.request_type();
998 let result = engine
999 .handle_batch_ddl_requests(batch_request)
1000 .await
1001 .context(HandleBatchDdlRequestSnafu { ddl_type });
1002
1003 match result {
1004 Ok(result) => {
1005 for (region_id, region_change) in ®ion_changes {
1006 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1007 .await?;
1008 }
1009
1010 Ok(RegionResponse {
1011 affected_rows: result.affected_rows,
1012 extensions: result.extensions,
1013 metadata: Vec::new(),
1014 })
1015 }
1016 Err(err) => {
1017 for (region_id, region_change) in region_changes {
1018 self.unset_region_status(region_id, &engine, region_change);
1019 }
1020
1021 Err(err)
1022 }
1023 }
1024 }
1025
1026 pub async fn handle_request(
1027 &self,
1028 region_id: RegionId,
1029 request: RegionRequest,
1030 ) -> Result<RegionResponse> {
1031 let request_type = request.request_type();
1032 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1033 .with_label_values(&[request_type])
1034 .start_timer();
1035
1036 let region_change = match &request {
1037 RegionRequest::Create(create) => {
1038 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1039 RegionChange::Register(attribute)
1040 }
1041 RegionRequest::Open(open) => {
1042 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1043 RegionChange::Register(attribute)
1044 }
1045 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1046 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1047 RegionChange::Ingest
1048 }
1049 RegionRequest::Alter(_)
1050 | RegionRequest::Flush(_)
1051 | RegionRequest::Compact(_)
1052 | RegionRequest::Truncate(_) => RegionChange::None,
1053 RegionRequest::Catchup(_) => RegionChange::Catchup,
1054 };
1055
1056 let engine = match self.get_engine(region_id, ®ion_change)? {
1057 CurrentEngine::Engine(engine) => engine,
1058 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1059 };
1060
1061 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1063
1064 match engine
1065 .handle_request(region_id, request)
1066 .await
1067 .with_context(|_| HandleRegionRequestSnafu { region_id })
1068 {
1069 Ok(result) => {
1070 if matches!(region_change, RegionChange::Ingest) {
1072 crate::metrics::REGION_CHANGED_ROW_COUNT
1073 .with_label_values(&[request_type])
1074 .inc_by(result.affected_rows as u64);
1075 }
1076 self.set_region_status_ready(region_id, engine.clone(), region_change)
1078 .await?;
1079
1080 Ok(RegionResponse {
1081 affected_rows: result.affected_rows,
1082 extensions: result.extensions,
1083 metadata: Vec::new(),
1084 })
1085 }
1086 Err(err) => {
1087 self.unset_region_status(region_id, &engine, region_change);
1089 Err(err)
1090 }
1091 }
1092 }
1093
1094 pub async fn handle_sync_region(
1096 &self,
1097 engine: &RegionEngineRef,
1098 region_id: RegionId,
1099 manifest_info: RegionManifestInfo,
1100 ) -> Result<()> {
1101 let Some(new_opened_regions) = engine
1102 .sync_region(region_id, manifest_info)
1103 .await
1104 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1105 .new_opened_logical_region_ids()
1106 else {
1107 warn!("No new opened logical regions");
1108 return Ok(());
1109 };
1110
1111 for region in new_opened_regions {
1112 self.region_map
1113 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1114 info!("Logical region {} is registered!", region);
1115 }
1116
1117 Ok(())
1118 }
1119
1120 fn set_region_status_not_ready(
1121 &self,
1122 region_id: RegionId,
1123 engine: &RegionEngineRef,
1124 region_change: &RegionChange,
1125 ) {
1126 match region_change {
1127 RegionChange::Register(_) => {
1128 self.region_map.insert(
1129 region_id,
1130 RegionEngineWithStatus::Registering(engine.clone()),
1131 );
1132 }
1133 RegionChange::Deregisters => {
1134 self.region_map.insert(
1135 region_id,
1136 RegionEngineWithStatus::Deregistering(engine.clone()),
1137 );
1138 }
1139 _ => {}
1140 }
1141 }
1142
1143 fn unset_region_status(
1144 &self,
1145 region_id: RegionId,
1146 engine: &RegionEngineRef,
1147 region_change: RegionChange,
1148 ) {
1149 match region_change {
1150 RegionChange::None | RegionChange::Ingest => {}
1151 RegionChange::Register(_) => {
1152 self.region_map.remove(®ion_id);
1153 }
1154 RegionChange::Deregisters => {
1155 self.region_map
1156 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1157 }
1158 RegionChange::Catchup => {}
1159 }
1160 }
1161
1162 async fn set_region_status_ready(
1163 &self,
1164 region_id: RegionId,
1165 engine: RegionEngineRef,
1166 region_change: RegionChange,
1167 ) -> Result<()> {
1168 let engine_type = engine.name();
1169 match region_change {
1170 RegionChange::None | RegionChange::Ingest => {}
1171 RegionChange::Register(attribute) => {
1172 info!(
1173 "Region {region_id} is registered to engine {}",
1174 attribute.engine()
1175 );
1176 self.region_map
1177 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1178
1179 match attribute {
1180 RegionAttribute::Metric { physical } => {
1181 if physical {
1182 self.register_logical_regions(&engine, region_id).await?;
1184 self.event_listener.on_region_registered(region_id);
1186 }
1187 }
1188 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1189 RegionAttribute::File => {
1190 }
1192 }
1193 }
1194 RegionChange::Deregisters => {
1195 info!("Region {region_id} is deregistered from engine {engine_type}");
1196 self.region_map
1197 .remove(®ion_id)
1198 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1199 self.event_listener.on_region_deregistered(region_id);
1200 }
1201 RegionChange::Catchup => {
1202 if is_metric_engine(engine.name()) {
1203 self.register_logical_regions(&engine, region_id).await?;
1205 }
1206 }
1207 }
1208 Ok(())
1209 }
1210
1211 async fn register_logical_regions(
1212 &self,
1213 engine: &RegionEngineRef,
1214 physical_region_id: RegionId,
1215 ) -> Result<()> {
1216 let metric_engine =
1217 engine
1218 .as_any()
1219 .downcast_ref::<MetricEngine>()
1220 .context(UnexpectedSnafu {
1221 violated: format!(
1222 "expecting engine type '{}', actual '{}'",
1223 METRIC_ENGINE_NAME,
1224 engine.name(),
1225 ),
1226 })?;
1227
1228 let logical_regions = metric_engine
1229 .logical_regions(physical_region_id)
1230 .await
1231 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1232
1233 for region in logical_regions {
1234 self.region_map
1235 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1236 info!("Logical region {} is registered!", region);
1237 }
1238 Ok(())
1239 }
1240
1241 pub async fn handle_read(
1242 &self,
1243 request: QueryRequest,
1244 query_ctx: QueryContextRef,
1245 ) -> Result<SendableRecordBatchStream> {
1246 let result = self
1249 .query_engine
1250 .execute(request.plan, query_ctx)
1251 .await
1252 .context(ExecuteLogicalPlanSnafu)?;
1253
1254 match result.data {
1255 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1256 UnsupportedOutputSnafu { expected: "stream" }.fail()
1257 }
1258 OutputData::Stream(stream) => Ok(stream),
1259 }
1260 }
1261
1262 async fn stop(&self) -> Result<()> {
1263 let regions = self
1271 .region_map
1272 .iter()
1273 .map(|x| (*x.key(), x.value().clone()))
1274 .collect::<Vec<_>>();
1275 let num_regions = regions.len();
1276
1277 for (region_id, engine) in regions {
1278 let closed = engine
1279 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1280 .await;
1281 match closed {
1282 Ok(_) => debug!("Region {region_id} is closed"),
1283 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1284 }
1285 }
1286 self.region_map.clear();
1287 info!("closed {num_regions} regions");
1288
1289 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1290 for (engine_name, engine) in engines {
1291 engine
1292 .stop()
1293 .await
1294 .context(StopRegionEngineSnafu { name: &engine_name })?;
1295 info!("Region engine {engine_name} is stopped");
1296 }
1297
1298 Ok(())
1299 }
1300}
1301
1302#[derive(Debug, Clone, Copy)]
1303enum RegionChange {
1304 None,
1305 Register(RegionAttribute),
1306 Deregisters,
1307 Catchup,
1308 Ingest,
1309}
1310
1311fn is_metric_engine(engine: &str) -> bool {
1312 engine == METRIC_ENGINE_NAME
1313}
1314
1315fn parse_region_attribute(
1316 engine: &str,
1317 options: &HashMap<String, String>,
1318) -> Result<RegionAttribute> {
1319 match engine {
1320 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1321 METRIC_ENGINE_NAME => {
1322 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1323
1324 Ok(RegionAttribute::Metric { physical })
1325 }
1326 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1327 _ => error::UnexpectedSnafu {
1328 violated: format!("Unknown engine: {}", engine),
1329 }
1330 .fail(),
1331 }
1332}
1333
1334#[derive(Debug, Clone, Copy)]
1335enum RegionAttribute {
1336 Mito,
1337 Metric { physical: bool },
1338 File,
1339}
1340
1341impl RegionAttribute {
1342 fn engine(&self) -> &'static str {
1343 match self {
1344 RegionAttribute::Mito => MITO_ENGINE_NAME,
1345 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1346 RegionAttribute::File => FILE_ENGINE_NAME,
1347 }
1348 }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353
1354 use std::assert_matches::assert_matches;
1355
1356 use api::v1::SemanticType;
1357 use common_error::ext::ErrorExt;
1358 use datatypes::prelude::ConcreteDataType;
1359 use mito2::test_util::CreateRequestBuilder;
1360 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1361 use store_api::region_engine::RegionEngine;
1362 use store_api::region_request::{
1363 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1364 };
1365 use store_api::storage::RegionId;
1366
1367 use super::*;
1368 use crate::error::Result;
1369 use crate::tests::{mock_region_server, MockRegionEngine};
1370
1371 #[tokio::test]
1372 async fn test_region_registering() {
1373 common_telemetry::init_default_ut_logging();
1374
1375 let mut mock_region_server = mock_region_server();
1376 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1377 let engine_name = engine.name();
1378 mock_region_server.register_engine(engine.clone());
1379 let region_id = RegionId::new(1, 1);
1380 let builder = CreateRequestBuilder::new();
1381 let create_req = builder.build();
1382 mock_region_server.inner.region_map.insert(
1384 region_id,
1385 RegionEngineWithStatus::Registering(engine.clone()),
1386 );
1387 let response = mock_region_server
1388 .handle_request(region_id, RegionRequest::Create(create_req))
1389 .await
1390 .unwrap();
1391 assert_eq!(response.affected_rows, 0);
1392 let status = mock_region_server
1393 .inner
1394 .region_map
1395 .get(®ion_id)
1396 .unwrap()
1397 .clone();
1398 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1399
1400 mock_region_server.inner.region_map.insert(
1401 region_id,
1402 RegionEngineWithStatus::Registering(engine.clone()),
1403 );
1404 let response = mock_region_server
1405 .handle_request(
1406 region_id,
1407 RegionRequest::Open(RegionOpenRequest {
1408 engine: engine_name.to_string(),
1409 table_dir: String::new(),
1410 path_type: PathType::Bare,
1411 options: Default::default(),
1412 skip_wal_replay: false,
1413 checkpoint: None,
1414 }),
1415 )
1416 .await
1417 .unwrap();
1418 assert_eq!(response.affected_rows, 0);
1419 let status = mock_region_server
1420 .inner
1421 .region_map
1422 .get(®ion_id)
1423 .unwrap()
1424 .clone();
1425 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1426 }
1427
1428 #[tokio::test]
1429 async fn test_region_deregistering() {
1430 common_telemetry::init_default_ut_logging();
1431
1432 let mut mock_region_server = mock_region_server();
1433 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1434
1435 mock_region_server.register_engine(engine.clone());
1436
1437 let region_id = RegionId::new(1, 1);
1438
1439 mock_region_server.inner.region_map.insert(
1441 region_id,
1442 RegionEngineWithStatus::Deregistering(engine.clone()),
1443 );
1444
1445 let response = mock_region_server
1446 .handle_request(
1447 region_id,
1448 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1449 )
1450 .await
1451 .unwrap();
1452 assert_eq!(response.affected_rows, 0);
1453
1454 let status = mock_region_server
1455 .inner
1456 .region_map
1457 .get(®ion_id)
1458 .unwrap()
1459 .clone();
1460 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1461
1462 mock_region_server.inner.region_map.insert(
1463 region_id,
1464 RegionEngineWithStatus::Deregistering(engine.clone()),
1465 );
1466
1467 let response = mock_region_server
1468 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1469 .await
1470 .unwrap();
1471 assert_eq!(response.affected_rows, 0);
1472
1473 let status = mock_region_server
1474 .inner
1475 .region_map
1476 .get(®ion_id)
1477 .unwrap()
1478 .clone();
1479 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1480 }
1481
1482 #[tokio::test]
1483 async fn test_region_not_ready() {
1484 common_telemetry::init_default_ut_logging();
1485
1486 let mut mock_region_server = mock_region_server();
1487 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1488
1489 mock_region_server.register_engine(engine.clone());
1490
1491 let region_id = RegionId::new(1, 1);
1492
1493 mock_region_server.inner.region_map.insert(
1495 region_id,
1496 RegionEngineWithStatus::Registering(engine.clone()),
1497 );
1498
1499 let err = mock_region_server
1500 .handle_request(
1501 region_id,
1502 RegionRequest::Truncate(RegionTruncateRequest::All),
1503 )
1504 .await
1505 .unwrap_err();
1506
1507 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1508 }
1509
1510 #[tokio::test]
1511 async fn test_region_request_failed() {
1512 common_telemetry::init_default_ut_logging();
1513
1514 let mut mock_region_server = mock_region_server();
1515 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1516 MITO_ENGINE_NAME,
1517 Box::new(|_region_id, _request| {
1518 error::UnexpectedSnafu {
1519 violated: "test".to_string(),
1520 }
1521 .fail()
1522 }),
1523 );
1524
1525 mock_region_server.register_engine(engine.clone());
1526
1527 let region_id = RegionId::new(1, 1);
1528 let builder = CreateRequestBuilder::new();
1529 let create_req = builder.build();
1530 mock_region_server
1531 .handle_request(region_id, RegionRequest::Create(create_req))
1532 .await
1533 .unwrap_err();
1534
1535 let status = mock_region_server.inner.region_map.get(®ion_id);
1536 assert!(status.is_none());
1537
1538 mock_region_server
1539 .inner
1540 .region_map
1541 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1542
1543 mock_region_server
1544 .handle_request(
1545 region_id,
1546 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1547 )
1548 .await
1549 .unwrap_err();
1550
1551 let status = mock_region_server.inner.region_map.get(®ion_id);
1552 assert!(status.is_some());
1553 }
1554
1555 #[tokio::test]
1556 async fn test_batch_open_region_ignore_nonexistent_regions() {
1557 common_telemetry::init_default_ut_logging();
1558 let mut mock_region_server = mock_region_server();
1559 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1560 MITO_ENGINE_NAME,
1561 Box::new(|region_id, _request| {
1562 if region_id == RegionId::new(1, 1) {
1563 error::RegionNotFoundSnafu { region_id }.fail()
1564 } else {
1565 Ok(0)
1566 }
1567 }),
1568 );
1569 mock_region_server.register_engine(engine.clone());
1570
1571 let region_ids = mock_region_server
1572 .handle_batch_open_requests(
1573 8,
1574 vec![
1575 (
1576 RegionId::new(1, 1),
1577 RegionOpenRequest {
1578 engine: MITO_ENGINE_NAME.to_string(),
1579 table_dir: String::new(),
1580 path_type: PathType::Bare,
1581 options: Default::default(),
1582 skip_wal_replay: false,
1583 checkpoint: None,
1584 },
1585 ),
1586 (
1587 RegionId::new(1, 2),
1588 RegionOpenRequest {
1589 engine: MITO_ENGINE_NAME.to_string(),
1590 table_dir: String::new(),
1591 path_type: PathType::Bare,
1592 options: Default::default(),
1593 skip_wal_replay: false,
1594 checkpoint: None,
1595 },
1596 ),
1597 ],
1598 true,
1599 )
1600 .await
1601 .unwrap();
1602 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1603
1604 let err = mock_region_server
1605 .handle_batch_open_requests(
1606 8,
1607 vec![
1608 (
1609 RegionId::new(1, 1),
1610 RegionOpenRequest {
1611 engine: MITO_ENGINE_NAME.to_string(),
1612 table_dir: String::new(),
1613 path_type: PathType::Bare,
1614 options: Default::default(),
1615 skip_wal_replay: false,
1616 checkpoint: None,
1617 },
1618 ),
1619 (
1620 RegionId::new(1, 2),
1621 RegionOpenRequest {
1622 engine: MITO_ENGINE_NAME.to_string(),
1623 table_dir: String::new(),
1624 path_type: PathType::Bare,
1625 options: Default::default(),
1626 skip_wal_replay: false,
1627 checkpoint: None,
1628 },
1629 ),
1630 ],
1631 false,
1632 )
1633 .await
1634 .unwrap_err();
1635 assert_eq!(err.status_code(), StatusCode::Unexpected);
1636 }
1637
1638 struct CurrentEngineTest {
1639 region_id: RegionId,
1640 current_region_status: Option<RegionEngineWithStatus>,
1641 region_change: RegionChange,
1642 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1643 }
1644
1645 #[tokio::test]
1646 async fn test_current_engine() {
1647 common_telemetry::init_default_ut_logging();
1648
1649 let mut mock_region_server = mock_region_server();
1650 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1651 mock_region_server.register_engine(engine.clone());
1652
1653 let region_id = RegionId::new(1024, 1);
1654 let tests = vec![
1655 CurrentEngineTest {
1657 region_id,
1658 current_region_status: None,
1659 region_change: RegionChange::None,
1660 assert: Box::new(|result| {
1661 let err = result.unwrap_err();
1662 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1663 }),
1664 },
1665 CurrentEngineTest {
1666 region_id,
1667 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1668 region_change: RegionChange::None,
1669 assert: Box::new(|result| {
1670 let current_engine = result.unwrap();
1671 assert_matches!(current_engine, CurrentEngine::Engine(_));
1672 }),
1673 },
1674 CurrentEngineTest {
1675 region_id,
1676 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1677 region_change: RegionChange::None,
1678 assert: Box::new(|result| {
1679 let err = result.unwrap_err();
1680 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1681 }),
1682 },
1683 CurrentEngineTest {
1684 region_id,
1685 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1686 region_change: RegionChange::None,
1687 assert: Box::new(|result| {
1688 let err = result.unwrap_err();
1689 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1690 }),
1691 },
1692 CurrentEngineTest {
1694 region_id,
1695 current_region_status: None,
1696 region_change: RegionChange::Register(RegionAttribute::Mito),
1697 assert: Box::new(|result| {
1698 let current_engine = result.unwrap();
1699 assert_matches!(current_engine, CurrentEngine::Engine(_));
1700 }),
1701 },
1702 CurrentEngineTest {
1703 region_id,
1704 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1705 region_change: RegionChange::Register(RegionAttribute::Mito),
1706 assert: Box::new(|result| {
1707 let current_engine = result.unwrap();
1708 assert_matches!(current_engine, CurrentEngine::Engine(_));
1709 }),
1710 },
1711 CurrentEngineTest {
1712 region_id,
1713 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1714 region_change: RegionChange::Register(RegionAttribute::Mito),
1715 assert: Box::new(|result| {
1716 let err = result.unwrap_err();
1717 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1718 }),
1719 },
1720 CurrentEngineTest {
1721 region_id,
1722 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1723 region_change: RegionChange::Register(RegionAttribute::Mito),
1724 assert: Box::new(|result| {
1725 let current_engine = result.unwrap();
1726 assert_matches!(current_engine, CurrentEngine::Engine(_));
1727 }),
1728 },
1729 CurrentEngineTest {
1731 region_id,
1732 current_region_status: None,
1733 region_change: RegionChange::Deregisters,
1734 assert: Box::new(|result| {
1735 let current_engine = result.unwrap();
1736 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1737 }),
1738 },
1739 CurrentEngineTest {
1740 region_id,
1741 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1742 region_change: RegionChange::Deregisters,
1743 assert: Box::new(|result| {
1744 let err = result.unwrap_err();
1745 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1746 }),
1747 },
1748 CurrentEngineTest {
1749 region_id,
1750 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1751 region_change: RegionChange::Deregisters,
1752 assert: Box::new(|result| {
1753 let current_engine = result.unwrap();
1754 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1755 }),
1756 },
1757 CurrentEngineTest {
1758 region_id,
1759 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1760 region_change: RegionChange::Deregisters,
1761 assert: Box::new(|result| {
1762 let current_engine = result.unwrap();
1763 assert_matches!(current_engine, CurrentEngine::Engine(_));
1764 }),
1765 },
1766 ];
1767
1768 for test in tests {
1769 let CurrentEngineTest {
1770 region_id,
1771 current_region_status,
1772 region_change,
1773 assert,
1774 } = test;
1775
1776 if let Some(status) = current_region_status {
1778 mock_region_server
1779 .inner
1780 .region_map
1781 .insert(region_id, status);
1782 } else {
1783 mock_region_server.inner.region_map.remove(®ion_id);
1784 }
1785
1786 let result = mock_region_server
1787 .inner
1788 .get_engine(region_id, ®ion_change);
1789
1790 assert(result);
1791 }
1792 }
1793
1794 #[tokio::test]
1795 async fn test_region_server_parallelism() {
1796 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1797 let first_query = p.acquire().await;
1798 assert!(first_query.is_ok());
1799 let second_query = p.acquire().await;
1800 assert!(second_query.is_ok());
1801 let third_query = p.acquire().await;
1802 assert!(third_query.is_err());
1803 let err = third_query.unwrap_err();
1804 assert_eq!(
1805 err.output_msg(),
1806 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1807 );
1808 drop(first_query);
1809 let forth_query = p.acquire().await;
1810 assert!(forth_query.is_ok());
1811 }
1812
1813 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1814 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1815 metadata_builder.push_column_metadata(ColumnMetadata {
1816 column_schema: datatypes::schema::ColumnSchema::new(
1817 "timestamp",
1818 ConcreteDataType::timestamp_nanosecond_datatype(),
1819 false,
1820 ),
1821 semantic_type: SemanticType::Timestamp,
1822 column_id: 0,
1823 });
1824 metadata_builder.push_column_metadata(ColumnMetadata {
1825 column_schema: datatypes::schema::ColumnSchema::new(
1826 "file",
1827 ConcreteDataType::string_datatype(),
1828 true,
1829 ),
1830 semantic_type: SemanticType::Tag,
1831 column_id: 1,
1832 });
1833 metadata_builder.push_column_metadata(ColumnMetadata {
1834 column_schema: datatypes::schema::ColumnSchema::new(
1835 "message",
1836 ConcreteDataType::string_datatype(),
1837 true,
1838 ),
1839 semantic_type: SemanticType::Field,
1840 column_id: 2,
1841 });
1842 metadata_builder.primary_key(vec![1]);
1843 metadata_builder.build().unwrap()
1844 }
1845
1846 #[tokio::test]
1847 async fn test_handle_list_metadata_request() {
1848 common_telemetry::init_default_ut_logging();
1849
1850 let mut mock_region_server = mock_region_server();
1851 let region_id_1 = RegionId::new(1, 0);
1852 let region_id_2 = RegionId::new(2, 0);
1853
1854 let metadata_1 = mock_region_metadata(region_id_1);
1855 let metadata_2 = mock_region_metadata(region_id_2);
1856 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
1857
1858 let metadata_1 = Arc::new(metadata_1);
1859 let metadata_2 = Arc::new(metadata_2);
1860 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1861 MITO_ENGINE_NAME,
1862 Box::new(move |region_id| {
1863 if region_id == region_id_1 {
1864 Ok(metadata_1.clone())
1865 } else if region_id == region_id_2 {
1866 Ok(metadata_2.clone())
1867 } else {
1868 error::RegionNotFoundSnafu { region_id }.fail()
1869 }
1870 }),
1871 );
1872
1873 mock_region_server.register_engine(engine.clone());
1874 mock_region_server
1875 .inner
1876 .region_map
1877 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1878 mock_region_server
1879 .inner
1880 .region_map
1881 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1882
1883 let list_metadata_request = ListMetadataRequest {
1885 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1886 };
1887 let response = mock_region_server
1888 .handle_list_metadata_request(&list_metadata_request)
1889 .await
1890 .unwrap();
1891 let decoded_metadata: Vec<Option<RegionMetadata>> =
1892 serde_json::from_slice(&response.metadata).unwrap();
1893 assert_eq!(metadatas, decoded_metadata);
1894 }
1895
1896 #[tokio::test]
1897 async fn test_handle_list_metadata_not_found() {
1898 common_telemetry::init_default_ut_logging();
1899
1900 let mut mock_region_server = mock_region_server();
1901 let region_id_1 = RegionId::new(1, 0);
1902 let region_id_2 = RegionId::new(2, 0);
1903
1904 let metadata_1 = mock_region_metadata(region_id_1);
1905 let metadatas = vec![Some(metadata_1.clone()), None];
1906
1907 let metadata_1 = Arc::new(metadata_1);
1908 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1909 MITO_ENGINE_NAME,
1910 Box::new(move |region_id| {
1911 if region_id == region_id_1 {
1912 Ok(metadata_1.clone())
1913 } else {
1914 error::RegionNotFoundSnafu { region_id }.fail()
1915 }
1916 }),
1917 );
1918
1919 mock_region_server.register_engine(engine.clone());
1920 mock_region_server
1921 .inner
1922 .region_map
1923 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1924
1925 let list_metadata_request = ListMetadataRequest {
1927 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1928 };
1929 let response = mock_region_server
1930 .handle_list_metadata_request(&list_metadata_request)
1931 .await
1932 .unwrap();
1933 let decoded_metadata: Vec<Option<RegionMetadata>> =
1934 serde_json::from_slice(&response.metadata).unwrap();
1935 assert_eq!(metadatas, decoded_metadata);
1936
1937 mock_region_server
1939 .inner
1940 .region_map
1941 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1942 let response = mock_region_server
1943 .handle_list_metadata_request(&list_metadata_request)
1944 .await
1945 .unwrap();
1946 let decoded_metadata: Vec<Option<RegionMetadata>> =
1947 serde_json::from_slice(&response.metadata).unwrap();
1948 assert_eq!(metadatas, decoded_metadata);
1949 }
1950
1951 #[tokio::test]
1952 async fn test_handle_list_metadata_failed() {
1953 common_telemetry::init_default_ut_logging();
1954
1955 let mut mock_region_server = mock_region_server();
1956 let region_id_1 = RegionId::new(1, 0);
1957
1958 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1959 MITO_ENGINE_NAME,
1960 Box::new(move |region_id| {
1961 error::UnexpectedSnafu {
1962 violated: format!("Failed to get region {region_id}"),
1963 }
1964 .fail()
1965 }),
1966 );
1967
1968 mock_region_server.register_engine(engine.clone());
1969 mock_region_server
1970 .inner
1971 .region_map
1972 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1973
1974 let list_metadata_request = ListMetadataRequest {
1976 region_ids: vec![region_id_1.as_u64()],
1977 };
1978 mock_region_server
1979 .handle_list_metadata_request(&list_metadata_request)
1980 .await
1981 .unwrap_err();
1982 }
1983}