1mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::{Arc, RwLock};
21use std::time::Duration;
22
23use api::region::RegionResponse;
24use api::v1::meta::TopicStat;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
28};
29use api::v1::{ResponseHeader, Status};
30use arrow_flight::{FlightData, Ticket};
31use async_trait::async_trait;
32use bytes::Bytes;
33use common_error::ext::{BoxedError, ErrorExt};
34use common_error::status_code::StatusCode;
35use common_meta::datanode::TopicStatsReporter;
36use common_query::OutputData;
37use common_query::request::QueryRequest;
38use common_recordbatch::SendableRecordBatchStream;
39use common_runtime::Runtime;
40use common_telemetry::tracing::{self, info_span};
41use common_telemetry::tracing_context::{FutureExt, TracingContext};
42use common_telemetry::{debug, error, info, warn};
43use dashmap::DashMap;
44use datafusion::datasource::TableProvider;
45use datafusion_common::tree_node::TreeNode;
46use futures_util::future::try_join_all;
47use metric_engine::engine::MetricEngine;
48use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
49use prost::Message;
50use query::QueryEngineRef;
51pub use query::dummy_catalog::{
52 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
53};
54use serde_json;
55use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
56use servers::grpc::FlightCompression;
57use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
58use servers::grpc::region_server::RegionServerHandler;
59use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
60use snafu::{OptionExt, ResultExt, ensure};
61use store_api::metric_engine_consts::{
62 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
63};
64use store_api::region_engine::{
65 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
66 SettableRegionRoleState,
67};
68use store_api::region_request::{
69 AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
70};
71use store_api::storage::RegionId;
72use tokio::sync::{Semaphore, SemaphorePermit};
73use tokio::time::timeout;
74use tonic::{Request, Response, Result as TonicResult};
75
76use crate::error::{
77 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
78 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
79 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
80 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
81 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
82 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
83};
84use crate::event_listener::RegionServerEventListenerRef;
85use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
86
87#[derive(Clone)]
88pub struct RegionServer {
89 inner: Arc<RegionServerInner>,
90 flight_compression: FlightCompression,
91}
92
93pub struct RegionStat {
94 pub region_id: RegionId,
95 pub engine: String,
96 pub role: RegionRole,
97}
98
99impl RegionServer {
100 pub fn new(
101 query_engine: QueryEngineRef,
102 runtime: Runtime,
103 event_listener: RegionServerEventListenerRef,
104 flight_compression: FlightCompression,
105 ) -> Self {
106 Self::with_table_provider(
107 query_engine,
108 runtime,
109 event_listener,
110 Arc::new(DummyTableProviderFactory),
111 0,
112 Duration::from_millis(0),
113 flight_compression,
114 )
115 }
116
117 pub fn with_table_provider(
118 query_engine: QueryEngineRef,
119 runtime: Runtime,
120 event_listener: RegionServerEventListenerRef,
121 table_provider_factory: TableProviderFactoryRef,
122 max_concurrent_queries: usize,
123 concurrent_query_limiter_timeout: Duration,
124 flight_compression: FlightCompression,
125 ) -> Self {
126 Self {
127 inner: Arc::new(RegionServerInner::new(
128 query_engine,
129 runtime,
130 event_listener,
131 table_provider_factory,
132 RegionServerParallelism::from_opts(
133 max_concurrent_queries,
134 concurrent_query_limiter_timeout,
135 ),
136 )),
137 flight_compression,
138 }
139 }
140
141 pub fn register_engine(&mut self, engine: RegionEngineRef) {
143 self.inner.register_engine(engine);
144 }
145
146 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
148 self.inner.set_topic_stats_reporter(topic_stats_reporter);
149 }
150
151 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
153 match self.inner.get_engine(region_id, &RegionChange::None) {
154 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
155 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
156 Err(error::Error::RegionNotFound { .. }) => Ok(None),
157 Err(err) => Err(err),
158 }
159 }
160
161 #[tracing::instrument(skip_all)]
162 pub async fn handle_batch_open_requests(
163 &self,
164 parallelism: usize,
165 requests: Vec<(RegionId, RegionOpenRequest)>,
166 ignore_nonexistent_region: bool,
167 ) -> Result<Vec<RegionId>> {
168 self.inner
169 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
170 .await
171 }
172
173 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
174 pub async fn handle_request(
175 &self,
176 region_id: RegionId,
177 request: RegionRequest,
178 ) -> Result<RegionResponse> {
179 self.inner.handle_request(region_id, request).await
180 }
181
182 async fn table_provider(
184 &self,
185 region_id: RegionId,
186 ctx: Option<QueryContextRef>,
187 ) -> Result<Arc<dyn TableProvider>> {
188 let status = self
189 .inner
190 .region_map
191 .get(®ion_id)
192 .context(RegionNotFoundSnafu { region_id })?
193 .clone();
194 ensure!(
195 matches!(status, RegionEngineWithStatus::Ready(_)),
196 RegionNotReadySnafu { region_id }
197 );
198
199 self.inner
200 .table_provider_factory
201 .create(region_id, status.into_engine(), ctx)
202 .await
203 .context(ExecuteLogicalPlanSnafu)
204 }
205
206 pub async fn handle_remote_read(
208 &self,
209 request: api::v1::region::QueryRequest,
210 query_ctx: QueryContextRef,
211 ) -> Result<SendableRecordBatchStream> {
212 let _permit = if let Some(p) = &self.inner.parallelism {
213 Some(p.acquire().await?)
214 } else {
215 None
216 };
217
218 let region_id = RegionId::from_u64(request.region_id);
219 let catalog_list = Arc::new(NameAwareCatalogList::new(
220 self.clone(),
221 region_id,
222 query_ctx.clone(),
223 ));
224
225 if query_ctx.explain_verbose() {
226 common_telemetry::info!("Handle remote read for region: {}", region_id);
227 }
228
229 let decoder = self
230 .inner
231 .query_engine
232 .engine_context(query_ctx.clone())
233 .new_plan_decoder()
234 .context(NewPlanDecoderSnafu)?;
235
236 let plan = decoder
237 .decode(Bytes::from(request.plan), catalog_list, false)
238 .await
239 .context(DecodeLogicalPlanSnafu)?;
240
241 self.inner
242 .handle_read(
243 QueryRequest {
244 header: request.header,
245 region_id,
246 plan,
247 },
248 query_ctx,
249 )
250 .await
251 }
252
253 #[tracing::instrument(skip_all)]
254 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
255 let _permit = if let Some(p) = &self.inner.parallelism {
256 Some(p.acquire().await?)
257 } else {
258 None
259 };
260
261 let ctx = request.header.as_ref().map(|h| h.into());
262 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
263
264 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
265 .context(DataFusionSnafu)?;
266 let mut injector = injector_builder
267 .build(self, request.region_id, query_ctx.clone())
268 .await?;
269
270 let plan = request
271 .plan
272 .rewrite(&mut injector)
273 .context(DataFusionSnafu)?
274 .data;
275
276 self.inner
277 .handle_read(QueryRequest { plan, ..request }, query_ctx)
278 .await
279 }
280
281 pub fn reportable_regions(&self) -> Vec<RegionStat> {
285 self.inner
286 .region_map
287 .iter()
288 .filter_map(|e| {
289 let region_id = *e.key();
290 e.role(region_id).map(|role| RegionStat {
292 region_id,
293 engine: e.value().name().to_string(),
294 role,
295 })
296 })
297 .collect()
298 }
299
300 pub fn topic_stats(&self) -> Vec<TopicStat> {
302 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
303 let Some(reporter) = reporter.as_mut() else {
304 return vec![];
305 };
306 reporter
307 .reportable_topics()
308 .into_iter()
309 .map(|stat| TopicStat {
310 topic_name: stat.topic,
311 record_size: stat.record_size,
312 record_num: stat.record_num,
313 latest_entry_id: stat.latest_entry_id,
314 })
315 .collect()
316 }
317
318 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
319 self.inner.region_map.get(®ion_id).and_then(|engine| {
320 engine.role(region_id).map(|role| match role {
321 RegionRole::Follower => false,
322 RegionRole::Leader => true,
323 RegionRole::DowngradingLeader => true,
324 })
325 })
326 }
327
328 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
329 let engine = self
330 .inner
331 .region_map
332 .get(®ion_id)
333 .with_context(|| RegionNotFoundSnafu { region_id })?;
334 engine
335 .set_region_role(region_id, role)
336 .with_context(|_| HandleRegionRequestSnafu { region_id })
337 }
338
339 pub async fn set_region_role_state_gracefully(
349 &self,
350 region_id: RegionId,
351 state: SettableRegionRoleState,
352 ) -> Result<SetRegionRoleStateResponse> {
353 match self.inner.region_map.get(®ion_id) {
354 Some(engine) => Ok(engine
355 .set_region_role_state_gracefully(region_id, state)
356 .await
357 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
358 None => Ok(SetRegionRoleStateResponse::NotFound),
359 }
360 }
361
362 pub fn runtime(&self) -> Runtime {
363 self.inner.runtime.clone()
364 }
365
366 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
367 match self.inner.region_map.get(®ion_id) {
368 Some(e) => e.region_statistic(region_id),
369 None => None,
370 }
371 }
372
373 pub async fn stop(&self) -> Result<()> {
375 self.inner.stop().await
376 }
377
378 #[cfg(test)]
379 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
381 self.inner
382 .region_map
383 .insert(region_id, RegionEngineWithStatus::Ready(engine));
384 }
385
386 async fn handle_batch_ddl_requests(
387 &self,
388 request: region_request::Body,
389 ) -> Result<RegionResponse> {
390 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
392 .context(BuildRegionRequestsSnafu)?
393 .unwrap();
394 let tracing_context = TracingContext::from_current_span();
395
396 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
397 self.inner
398 .handle_batch_request(batch_request)
399 .trace(span)
400 .await
401 }
402
403 async fn handle_requests_in_parallel(
404 &self,
405 request: region_request::Body,
406 ) -> Result<RegionResponse> {
407 let requests =
408 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
409 let tracing_context = TracingContext::from_current_span();
410
411 let join_tasks = requests.into_iter().map(|(region_id, req)| {
412 let self_to_move = self;
413 let span = tracing_context.attach(info_span!(
414 "RegionServer::handle_region_request",
415 region_id = region_id.to_string()
416 ));
417 async move {
418 self_to_move
419 .handle_request(region_id, req)
420 .trace(span)
421 .await
422 }
423 });
424
425 let results = try_join_all(join_tasks).await?;
426 let mut affected_rows = 0;
427 let mut extensions = HashMap::new();
428 for result in results {
429 affected_rows += result.affected_rows;
430 extensions.extend(result.extensions);
431 }
432
433 Ok(RegionResponse {
434 affected_rows,
435 extensions,
436 metadata: Vec::new(),
437 })
438 }
439
440 async fn handle_requests_in_serial(
441 &self,
442 request: region_request::Body,
443 ) -> Result<RegionResponse> {
444 let requests =
445 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
446 let tracing_context = TracingContext::from_current_span();
447
448 let mut affected_rows = 0;
449 let mut extensions = HashMap::new();
450 for (region_id, req) in requests {
453 let span = tracing_context.attach(info_span!(
454 "RegionServer::handle_region_request",
455 region_id = region_id.to_string()
456 ));
457 let result = self.handle_request(region_id, req).trace(span).await?;
458
459 affected_rows += result.affected_rows;
460 extensions.extend(result.extensions);
461 }
462
463 Ok(RegionResponse {
464 affected_rows,
465 extensions,
466 metadata: Vec::new(),
467 })
468 }
469
470 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
471 let region_id = RegionId::from_u64(request.region_id);
472 let manifest_info = request
473 .manifest_info
474 .context(error::MissingRequiredFieldSnafu {
475 name: "manifest_info",
476 })?;
477
478 let manifest_info = match manifest_info {
479 ManifestInfo::MitoManifestInfo(info) => {
480 RegionManifestInfo::mito(info.data_manifest_version, 0)
481 }
482 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
483 info.data_manifest_version,
484 0,
485 info.metadata_manifest_version,
486 0,
487 ),
488 };
489
490 let tracing_context = TracingContext::from_current_span();
491 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
492
493 self.sync_region(region_id, manifest_info)
494 .trace(span)
495 .await
496 .map(|_| RegionResponse::new(AffectedRows::default()))
497 }
498
499 #[tracing::instrument(skip_all)]
504 async fn handle_list_metadata_request(
505 &self,
506 request: &ListMetadataRequest,
507 ) -> Result<RegionResponse> {
508 let mut region_metadatas = Vec::new();
509 for region_id in &request.region_ids {
511 let region_id = RegionId::from_u64(*region_id);
512 let Some(engine) = self.find_engine(region_id)? else {
514 region_metadatas.push(None);
515 continue;
516 };
517
518 match engine.get_metadata(region_id).await {
519 Ok(metadata) => region_metadatas.push(Some(metadata)),
520 Err(err) => {
521 if err.status_code() == StatusCode::RegionNotFound {
522 region_metadatas.push(None);
523 } else {
524 Err(err).with_context(|_| GetRegionMetadataSnafu {
525 engine: engine.name(),
526 region_id,
527 })?;
528 }
529 }
530 }
531 }
532
533 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
535
536 let response = RegionResponse::from_metadata(json_result);
537
538 Ok(response)
539 }
540
541 pub async fn sync_region(
543 &self,
544 region_id: RegionId,
545 manifest_info: RegionManifestInfo,
546 ) -> Result<()> {
547 let engine_with_status = self
548 .inner
549 .region_map
550 .get(®ion_id)
551 .with_context(|| RegionNotFoundSnafu { region_id })?;
552
553 self.inner
554 .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
555 .await
556 }
557}
558
559#[async_trait]
560impl RegionServerHandler for RegionServer {
561 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
562 let response = match &request {
563 region_request::Body::Creates(_)
564 | region_request::Body::Drops(_)
565 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
566 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
567 self.handle_requests_in_parallel(request).await
568 }
569 region_request::Body::Sync(sync_request) => {
570 self.handle_sync_region_request(sync_request).await
571 }
572 region_request::Body::ListMetadata(list_metadata_request) => {
573 self.handle_list_metadata_request(list_metadata_request)
574 .await
575 }
576 _ => self.handle_requests_in_serial(request).await,
577 }
578 .map_err(BoxedError::new)
579 .context(ExecuteGrpcRequestSnafu)?;
580
581 Ok(RegionResponseV1 {
582 header: Some(ResponseHeader {
583 status: Some(Status {
584 status_code: StatusCode::Success as _,
585 ..Default::default()
586 }),
587 }),
588 affected_rows: response.affected_rows as _,
589 extensions: response.extensions,
590 metadata: response.metadata,
591 })
592 }
593}
594
595#[async_trait]
596impl FlightCraft for RegionServer {
597 async fn do_get(
598 &self,
599 request: Request<Ticket>,
600 ) -> TonicResult<Response<TonicStream<FlightData>>> {
601 let ticket = request.into_inner().ticket;
602 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
603 .context(servers_error::InvalidFlightTicketSnafu)?;
604 let tracing_context = request
605 .header
606 .as_ref()
607 .map(|h| TracingContext::from_w3c(&h.tracing_context))
608 .unwrap_or_default();
609 let query_ctx = request
610 .header
611 .as_ref()
612 .map(|h| Arc::new(QueryContext::from(h)))
613 .unwrap_or(QueryContext::arc());
614
615 let result = self
616 .handle_remote_read(request, query_ctx.clone())
617 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
618 .await?;
619
620 let stream = Box::pin(FlightRecordBatchStream::new(
621 result,
622 tracing_context,
623 self.flight_compression,
624 query_ctx,
625 ));
626 Ok(Response::new(stream))
627 }
628}
629
630#[derive(Clone)]
631enum RegionEngineWithStatus {
632 Registering(RegionEngineRef),
634 Deregistering(RegionEngineRef),
636 Ready(RegionEngineRef),
638}
639
640impl RegionEngineWithStatus {
641 pub fn into_engine(self) -> RegionEngineRef {
643 match self {
644 RegionEngineWithStatus::Registering(engine) => engine,
645 RegionEngineWithStatus::Deregistering(engine) => engine,
646 RegionEngineWithStatus::Ready(engine) => engine,
647 }
648 }
649
650 pub fn engine(&self) -> &RegionEngineRef {
652 match self {
653 RegionEngineWithStatus::Registering(engine) => engine,
654 RegionEngineWithStatus::Deregistering(engine) => engine,
655 RegionEngineWithStatus::Ready(engine) => engine,
656 }
657 }
658}
659
660impl Deref for RegionEngineWithStatus {
661 type Target = RegionEngineRef;
662
663 fn deref(&self) -> &Self::Target {
664 match self {
665 RegionEngineWithStatus::Registering(engine) => engine,
666 RegionEngineWithStatus::Deregistering(engine) => engine,
667 RegionEngineWithStatus::Ready(engine) => engine,
668 }
669 }
670}
671
672struct RegionServerInner {
673 engines: RwLock<HashMap<String, RegionEngineRef>>,
674 region_map: DashMap<RegionId, RegionEngineWithStatus>,
675 query_engine: QueryEngineRef,
676 runtime: Runtime,
677 event_listener: RegionServerEventListenerRef,
678 table_provider_factory: TableProviderFactoryRef,
679 parallelism: Option<RegionServerParallelism>,
682 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
684 mito_engine: RwLock<Option<MitoEngine>>,
688}
689
690struct RegionServerParallelism {
691 semaphore: Semaphore,
692 timeout: Duration,
693}
694
695impl RegionServerParallelism {
696 pub fn from_opts(
697 max_concurrent_queries: usize,
698 concurrent_query_limiter_timeout: Duration,
699 ) -> Option<Self> {
700 if max_concurrent_queries == 0 {
701 return None;
702 }
703 Some(RegionServerParallelism {
704 semaphore: Semaphore::new(max_concurrent_queries),
705 timeout: concurrent_query_limiter_timeout,
706 })
707 }
708
709 pub async fn acquire(&self) -> Result<SemaphorePermit> {
710 timeout(self.timeout, self.semaphore.acquire())
711 .await
712 .context(ConcurrentQueryLimiterTimeoutSnafu)?
713 .context(ConcurrentQueryLimiterClosedSnafu)
714 }
715}
716
717enum CurrentEngine {
718 Engine(RegionEngineRef),
719 EarlyReturn(AffectedRows),
720}
721
722impl Debug for CurrentEngine {
723 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
724 match self {
725 CurrentEngine::Engine(engine) => f
726 .debug_struct("CurrentEngine")
727 .field("engine", &engine.name())
728 .finish(),
729 CurrentEngine::EarlyReturn(rows) => f
730 .debug_struct("CurrentEngine")
731 .field("return", rows)
732 .finish(),
733 }
734 }
735}
736
737impl RegionServerInner {
738 pub fn new(
739 query_engine: QueryEngineRef,
740 runtime: Runtime,
741 event_listener: RegionServerEventListenerRef,
742 table_provider_factory: TableProviderFactoryRef,
743 parallelism: Option<RegionServerParallelism>,
744 ) -> Self {
745 Self {
746 engines: RwLock::new(HashMap::new()),
747 region_map: DashMap::new(),
748 query_engine,
749 runtime,
750 event_listener,
751 table_provider_factory,
752 parallelism,
753 topic_stats_reporter: RwLock::new(None),
754 mito_engine: RwLock::new(None),
755 }
756 }
757
758 pub fn register_engine(&self, engine: RegionEngineRef) {
759 let engine_name = engine.name();
760 if engine_name == MITO_ENGINE_NAME
761 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
762 {
763 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
764 }
765
766 info!("Region Engine {engine_name} is registered");
767 self.engines
768 .write()
769 .unwrap()
770 .insert(engine_name.to_string(), engine);
771 }
772
773 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
774 info!("Set topic stats reporter");
775 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
776 }
777
778 fn get_engine(
779 &self,
780 region_id: RegionId,
781 region_change: &RegionChange,
782 ) -> Result<CurrentEngine> {
783 let current_region_status = self.region_map.get(®ion_id);
784
785 let engine = match region_change {
786 RegionChange::Register(attribute) => match current_region_status {
787 Some(status) => match status.clone() {
788 RegionEngineWithStatus::Registering(engine) => engine,
789 RegionEngineWithStatus::Deregistering(_) => {
790 return error::RegionBusySnafu { region_id }.fail();
791 }
792 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
793 },
794 _ => self
795 .engines
796 .read()
797 .unwrap()
798 .get(attribute.engine())
799 .with_context(|| RegionEngineNotFoundSnafu {
800 name: attribute.engine(),
801 })?
802 .clone(),
803 },
804 RegionChange::Deregisters => match current_region_status {
805 Some(status) => match status.clone() {
806 RegionEngineWithStatus::Registering(_) => {
807 return error::RegionBusySnafu { region_id }.fail();
808 }
809 RegionEngineWithStatus::Deregistering(_) => {
810 return Ok(CurrentEngine::EarlyReturn(0));
811 }
812 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
813 },
814 None => return Ok(CurrentEngine::EarlyReturn(0)),
815 },
816 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
817 match current_region_status {
818 Some(status) => match status.clone() {
819 RegionEngineWithStatus::Registering(_) => {
820 return error::RegionNotReadySnafu { region_id }.fail();
821 }
822 RegionEngineWithStatus::Deregistering(_) => {
823 return error::RegionNotFoundSnafu { region_id }.fail();
824 }
825 RegionEngineWithStatus::Ready(engine) => engine,
826 },
827 None => return error::RegionNotFoundSnafu { region_id }.fail(),
828 }
829 }
830 };
831
832 Ok(CurrentEngine::Engine(engine))
833 }
834
835 async fn handle_batch_open_requests_inner(
836 &self,
837 engine: RegionEngineRef,
838 parallelism: usize,
839 requests: Vec<(RegionId, RegionOpenRequest)>,
840 ignore_nonexistent_region: bool,
841 ) -> Result<Vec<RegionId>> {
842 let region_changes = requests
843 .iter()
844 .map(|(region_id, open)| {
845 let attribute = parse_region_attribute(&open.engine, &open.options)?;
846 Ok((*region_id, RegionChange::Register(attribute)))
847 })
848 .collect::<Result<HashMap<_, _>>>()?;
849
850 for (®ion_id, region_change) in ®ion_changes {
851 self.set_region_status_not_ready(region_id, &engine, region_change)
852 }
853
854 let mut open_regions = Vec::with_capacity(requests.len());
855 let mut errors = vec![];
856 match engine
857 .handle_batch_open_requests(parallelism, requests)
858 .await
859 .with_context(|_| HandleBatchOpenRequestSnafu)
860 {
861 Ok(results) => {
862 for (region_id, result) in results {
863 let region_change = ®ion_changes[®ion_id];
864 match result {
865 Ok(_) => {
866 if let Err(e) = self
867 .set_region_status_ready(region_id, engine.clone(), *region_change)
868 .await
869 {
870 error!(e; "Failed to set region to ready: {}", region_id);
871 errors.push(BoxedError::new(e));
872 } else {
873 open_regions.push(region_id)
874 }
875 }
876 Err(e) => {
877 self.unset_region_status(region_id, &engine, *region_change);
878 if e.status_code() == StatusCode::RegionNotFound
879 && ignore_nonexistent_region
880 {
881 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
882 } else {
883 error!(e; "Failed to open region: {}", region_id);
884 errors.push(e);
885 }
886 }
887 }
888 }
889 }
890 Err(e) => {
891 for (®ion_id, region_change) in ®ion_changes {
892 self.unset_region_status(region_id, &engine, *region_change);
893 }
894 error!(e; "Failed to open batch regions");
895 errors.push(BoxedError::new(e));
896 }
897 }
898
899 if !errors.is_empty() {
900 return error::UnexpectedSnafu {
901 violated: format!("Failed to open batch regions: {:?}", errors[0]),
903 }
904 .fail();
905 }
906
907 Ok(open_regions)
908 }
909
910 pub async fn handle_batch_open_requests(
911 &self,
912 parallelism: usize,
913 requests: Vec<(RegionId, RegionOpenRequest)>,
914 ignore_nonexistent_region: bool,
915 ) -> Result<Vec<RegionId>> {
916 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
917 HashMap::with_capacity(requests.len());
918 for (region_id, request) in requests {
919 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
920 requests.push((region_id, request));
921 } else {
922 engine_grouped_requests
923 .insert(request.engine.to_string(), vec![(region_id, request)]);
924 }
925 }
926
927 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
928 for (engine, requests) in engine_grouped_requests {
929 let engine = self
930 .engines
931 .read()
932 .unwrap()
933 .get(&engine)
934 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
935 .clone();
936 results.push(
937 self.handle_batch_open_requests_inner(
938 engine,
939 parallelism,
940 requests,
941 ignore_nonexistent_region,
942 )
943 .await,
944 )
945 }
946
947 Ok(results
948 .into_iter()
949 .collect::<Result<Vec<_>>>()?
950 .into_iter()
951 .flatten()
952 .collect::<Vec<_>>())
953 }
954
955 pub async fn handle_batch_request(
959 &self,
960 batch_request: BatchRegionDdlRequest,
961 ) -> Result<RegionResponse> {
962 let region_changes = match &batch_request {
963 BatchRegionDdlRequest::Create(requests) => requests
964 .iter()
965 .map(|(region_id, create)| {
966 let attribute = parse_region_attribute(&create.engine, &create.options)?;
967 Ok((*region_id, RegionChange::Register(attribute)))
968 })
969 .collect::<Result<Vec<_>>>()?,
970 BatchRegionDdlRequest::Drop(requests) => requests
971 .iter()
972 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
973 .collect::<Vec<_>>(),
974 BatchRegionDdlRequest::Alter(requests) => requests
975 .iter()
976 .map(|(region_id, _)| (*region_id, RegionChange::None))
977 .collect::<Vec<_>>(),
978 };
979
980 let (first_region_id, first_region_change) = region_changes.first().unwrap();
983 let engine = match self.get_engine(*first_region_id, first_region_change)? {
984 CurrentEngine::Engine(engine) => engine,
985 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
986 };
987
988 for (region_id, region_change) in region_changes.iter() {
989 self.set_region_status_not_ready(*region_id, &engine, region_change);
990 }
991
992 let ddl_type = batch_request.request_type();
993 let result = engine
994 .handle_batch_ddl_requests(batch_request)
995 .await
996 .context(HandleBatchDdlRequestSnafu { ddl_type });
997
998 match result {
999 Ok(result) => {
1000 for (region_id, region_change) in ®ion_changes {
1001 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1002 .await?;
1003 }
1004
1005 Ok(RegionResponse {
1006 affected_rows: result.affected_rows,
1007 extensions: result.extensions,
1008 metadata: Vec::new(),
1009 })
1010 }
1011 Err(err) => {
1012 for (region_id, region_change) in region_changes {
1013 self.unset_region_status(region_id, &engine, region_change);
1014 }
1015
1016 Err(err)
1017 }
1018 }
1019 }
1020
1021 pub async fn handle_request(
1022 &self,
1023 region_id: RegionId,
1024 request: RegionRequest,
1025 ) -> Result<RegionResponse> {
1026 let request_type = request.request_type();
1027 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1028 .with_label_values(&[request_type])
1029 .start_timer();
1030
1031 let region_change = match &request {
1032 RegionRequest::Create(create) => {
1033 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1034 RegionChange::Register(attribute)
1035 }
1036 RegionRequest::Open(open) => {
1037 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1038 RegionChange::Register(attribute)
1039 }
1040 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1041 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1042 RegionChange::Ingest
1043 }
1044 RegionRequest::Alter(_)
1045 | RegionRequest::Flush(_)
1046 | RegionRequest::Compact(_)
1047 | RegionRequest::Truncate(_) => RegionChange::None,
1048 RegionRequest::Catchup(_) => RegionChange::Catchup,
1049 };
1050
1051 let engine = match self.get_engine(region_id, ®ion_change)? {
1052 CurrentEngine::Engine(engine) => engine,
1053 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1054 };
1055
1056 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1058
1059 match engine
1060 .handle_request(region_id, request)
1061 .await
1062 .with_context(|_| HandleRegionRequestSnafu { region_id })
1063 {
1064 Ok(result) => {
1065 if matches!(region_change, RegionChange::Ingest) {
1067 crate::metrics::REGION_CHANGED_ROW_COUNT
1068 .with_label_values(&[request_type])
1069 .inc_by(result.affected_rows as u64);
1070 }
1071 self.set_region_status_ready(region_id, engine.clone(), region_change)
1073 .await?;
1074
1075 Ok(RegionResponse {
1076 affected_rows: result.affected_rows,
1077 extensions: result.extensions,
1078 metadata: Vec::new(),
1079 })
1080 }
1081 Err(err) => {
1082 self.unset_region_status(region_id, &engine, region_change);
1084 Err(err)
1085 }
1086 }
1087 }
1088
1089 pub async fn handle_sync_region(
1091 &self,
1092 engine: &RegionEngineRef,
1093 region_id: RegionId,
1094 manifest_info: RegionManifestInfo,
1095 ) -> Result<()> {
1096 let Some(new_opened_regions) = engine
1097 .sync_region(region_id, manifest_info)
1098 .await
1099 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1100 .new_opened_logical_region_ids()
1101 else {
1102 warn!("No new opened logical regions");
1103 return Ok(());
1104 };
1105
1106 for region in new_opened_regions {
1107 self.region_map
1108 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1109 info!("Logical region {} is registered!", region);
1110 }
1111
1112 Ok(())
1113 }
1114
1115 fn set_region_status_not_ready(
1116 &self,
1117 region_id: RegionId,
1118 engine: &RegionEngineRef,
1119 region_change: &RegionChange,
1120 ) {
1121 match region_change {
1122 RegionChange::Register(_) => {
1123 self.region_map.insert(
1124 region_id,
1125 RegionEngineWithStatus::Registering(engine.clone()),
1126 );
1127 }
1128 RegionChange::Deregisters => {
1129 self.region_map.insert(
1130 region_id,
1131 RegionEngineWithStatus::Deregistering(engine.clone()),
1132 );
1133 }
1134 _ => {}
1135 }
1136 }
1137
1138 fn unset_region_status(
1139 &self,
1140 region_id: RegionId,
1141 engine: &RegionEngineRef,
1142 region_change: RegionChange,
1143 ) {
1144 match region_change {
1145 RegionChange::None | RegionChange::Ingest => {}
1146 RegionChange::Register(_) => {
1147 self.region_map.remove(®ion_id);
1148 }
1149 RegionChange::Deregisters => {
1150 self.region_map
1151 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1152 }
1153 RegionChange::Catchup => {}
1154 }
1155 }
1156
1157 async fn set_region_status_ready(
1158 &self,
1159 region_id: RegionId,
1160 engine: RegionEngineRef,
1161 region_change: RegionChange,
1162 ) -> Result<()> {
1163 let engine_type = engine.name();
1164 match region_change {
1165 RegionChange::None | RegionChange::Ingest => {}
1166 RegionChange::Register(attribute) => {
1167 info!(
1168 "Region {region_id} is registered to engine {}",
1169 attribute.engine()
1170 );
1171 self.region_map
1172 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1173
1174 match attribute {
1175 RegionAttribute::Metric { physical } => {
1176 if physical {
1177 self.register_logical_regions(&engine, region_id).await?;
1179 self.event_listener.on_region_registered(region_id);
1181 }
1182 }
1183 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1184 RegionAttribute::File => {
1185 }
1187 }
1188 }
1189 RegionChange::Deregisters => {
1190 info!("Region {region_id} is deregistered from engine {engine_type}");
1191 self.region_map
1192 .remove(®ion_id)
1193 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1194 self.event_listener.on_region_deregistered(region_id);
1195 }
1196 RegionChange::Catchup => {
1197 if is_metric_engine(engine.name()) {
1198 self.register_logical_regions(&engine, region_id).await?;
1200 }
1201 }
1202 }
1203 Ok(())
1204 }
1205
1206 async fn register_logical_regions(
1207 &self,
1208 engine: &RegionEngineRef,
1209 physical_region_id: RegionId,
1210 ) -> Result<()> {
1211 let metric_engine =
1212 engine
1213 .as_any()
1214 .downcast_ref::<MetricEngine>()
1215 .context(UnexpectedSnafu {
1216 violated: format!(
1217 "expecting engine type '{}', actual '{}'",
1218 METRIC_ENGINE_NAME,
1219 engine.name(),
1220 ),
1221 })?;
1222
1223 let logical_regions = metric_engine
1224 .logical_regions(physical_region_id)
1225 .await
1226 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1227
1228 for region in logical_regions {
1229 self.region_map
1230 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1231 info!("Logical region {} is registered!", region);
1232 }
1233 Ok(())
1234 }
1235
1236 pub async fn handle_read(
1237 &self,
1238 request: QueryRequest,
1239 query_ctx: QueryContextRef,
1240 ) -> Result<SendableRecordBatchStream> {
1241 let result = self
1244 .query_engine
1245 .execute(request.plan, query_ctx)
1246 .await
1247 .context(ExecuteLogicalPlanSnafu)?;
1248
1249 match result.data {
1250 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1251 UnsupportedOutputSnafu { expected: "stream" }.fail()
1252 }
1253 OutputData::Stream(stream) => Ok(stream),
1254 }
1255 }
1256
1257 async fn stop(&self) -> Result<()> {
1258 let regions = self
1266 .region_map
1267 .iter()
1268 .map(|x| (*x.key(), x.value().clone()))
1269 .collect::<Vec<_>>();
1270 let num_regions = regions.len();
1271
1272 for (region_id, engine) in regions {
1273 let closed = engine
1274 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1275 .await;
1276 match closed {
1277 Ok(_) => debug!("Region {region_id} is closed"),
1278 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1279 }
1280 }
1281 self.region_map.clear();
1282 info!("closed {num_regions} regions");
1283
1284 drop(self.mito_engine.write().unwrap().take());
1285 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1286 for (engine_name, engine) in engines {
1287 engine
1288 .stop()
1289 .await
1290 .context(StopRegionEngineSnafu { name: &engine_name })?;
1291 info!("Region engine {engine_name} is stopped");
1292 }
1293
1294 Ok(())
1295 }
1296}
1297
1298#[derive(Debug, Clone, Copy)]
1299enum RegionChange {
1300 None,
1301 Register(RegionAttribute),
1302 Deregisters,
1303 Catchup,
1304 Ingest,
1305}
1306
1307fn is_metric_engine(engine: &str) -> bool {
1308 engine == METRIC_ENGINE_NAME
1309}
1310
1311fn parse_region_attribute(
1312 engine: &str,
1313 options: &HashMap<String, String>,
1314) -> Result<RegionAttribute> {
1315 match engine {
1316 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1317 METRIC_ENGINE_NAME => {
1318 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1319
1320 Ok(RegionAttribute::Metric { physical })
1321 }
1322 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1323 _ => error::UnexpectedSnafu {
1324 violated: format!("Unknown engine: {}", engine),
1325 }
1326 .fail(),
1327 }
1328}
1329
1330#[derive(Debug, Clone, Copy)]
1331enum RegionAttribute {
1332 Mito,
1333 Metric { physical: bool },
1334 File,
1335}
1336
1337impl RegionAttribute {
1338 fn engine(&self) -> &'static str {
1339 match self {
1340 RegionAttribute::Mito => MITO_ENGINE_NAME,
1341 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1342 RegionAttribute::File => FILE_ENGINE_NAME,
1343 }
1344 }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349
1350 use std::assert_matches::assert_matches;
1351
1352 use api::v1::SemanticType;
1353 use common_error::ext::ErrorExt;
1354 use datatypes::prelude::ConcreteDataType;
1355 use mito2::test_util::CreateRequestBuilder;
1356 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1357 use store_api::region_engine::RegionEngine;
1358 use store_api::region_request::{
1359 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1360 };
1361 use store_api::storage::RegionId;
1362
1363 use super::*;
1364 use crate::error::Result;
1365 use crate::tests::{MockRegionEngine, mock_region_server};
1366
1367 #[tokio::test]
1368 async fn test_region_registering() {
1369 common_telemetry::init_default_ut_logging();
1370
1371 let mut mock_region_server = mock_region_server();
1372 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1373 let engine_name = engine.name();
1374 mock_region_server.register_engine(engine.clone());
1375 let region_id = RegionId::new(1, 1);
1376 let builder = CreateRequestBuilder::new();
1377 let create_req = builder.build();
1378 mock_region_server.inner.region_map.insert(
1380 region_id,
1381 RegionEngineWithStatus::Registering(engine.clone()),
1382 );
1383 let response = mock_region_server
1384 .handle_request(region_id, RegionRequest::Create(create_req))
1385 .await
1386 .unwrap();
1387 assert_eq!(response.affected_rows, 0);
1388 let status = mock_region_server
1389 .inner
1390 .region_map
1391 .get(®ion_id)
1392 .unwrap()
1393 .clone();
1394 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1395
1396 mock_region_server.inner.region_map.insert(
1397 region_id,
1398 RegionEngineWithStatus::Registering(engine.clone()),
1399 );
1400 let response = mock_region_server
1401 .handle_request(
1402 region_id,
1403 RegionRequest::Open(RegionOpenRequest {
1404 engine: engine_name.to_string(),
1405 table_dir: String::new(),
1406 path_type: PathType::Bare,
1407 options: Default::default(),
1408 skip_wal_replay: false,
1409 checkpoint: None,
1410 }),
1411 )
1412 .await
1413 .unwrap();
1414 assert_eq!(response.affected_rows, 0);
1415 let status = mock_region_server
1416 .inner
1417 .region_map
1418 .get(®ion_id)
1419 .unwrap()
1420 .clone();
1421 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1422 }
1423
1424 #[tokio::test]
1425 async fn test_region_deregistering() {
1426 common_telemetry::init_default_ut_logging();
1427
1428 let mut mock_region_server = mock_region_server();
1429 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1430
1431 mock_region_server.register_engine(engine.clone());
1432
1433 let region_id = RegionId::new(1, 1);
1434
1435 mock_region_server.inner.region_map.insert(
1437 region_id,
1438 RegionEngineWithStatus::Deregistering(engine.clone()),
1439 );
1440
1441 let response = mock_region_server
1442 .handle_request(
1443 region_id,
1444 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1445 )
1446 .await
1447 .unwrap();
1448 assert_eq!(response.affected_rows, 0);
1449
1450 let status = mock_region_server
1451 .inner
1452 .region_map
1453 .get(®ion_id)
1454 .unwrap()
1455 .clone();
1456 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1457
1458 mock_region_server.inner.region_map.insert(
1459 region_id,
1460 RegionEngineWithStatus::Deregistering(engine.clone()),
1461 );
1462
1463 let response = mock_region_server
1464 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1465 .await
1466 .unwrap();
1467 assert_eq!(response.affected_rows, 0);
1468
1469 let status = mock_region_server
1470 .inner
1471 .region_map
1472 .get(®ion_id)
1473 .unwrap()
1474 .clone();
1475 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1476 }
1477
1478 #[tokio::test]
1479 async fn test_region_not_ready() {
1480 common_telemetry::init_default_ut_logging();
1481
1482 let mut mock_region_server = mock_region_server();
1483 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1484
1485 mock_region_server.register_engine(engine.clone());
1486
1487 let region_id = RegionId::new(1, 1);
1488
1489 mock_region_server.inner.region_map.insert(
1491 region_id,
1492 RegionEngineWithStatus::Registering(engine.clone()),
1493 );
1494
1495 let err = mock_region_server
1496 .handle_request(
1497 region_id,
1498 RegionRequest::Truncate(RegionTruncateRequest::All),
1499 )
1500 .await
1501 .unwrap_err();
1502
1503 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1504 }
1505
1506 #[tokio::test]
1507 async fn test_region_request_failed() {
1508 common_telemetry::init_default_ut_logging();
1509
1510 let mut mock_region_server = mock_region_server();
1511 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1512 MITO_ENGINE_NAME,
1513 Box::new(|_region_id, _request| {
1514 error::UnexpectedSnafu {
1515 violated: "test".to_string(),
1516 }
1517 .fail()
1518 }),
1519 );
1520
1521 mock_region_server.register_engine(engine.clone());
1522
1523 let region_id = RegionId::new(1, 1);
1524 let builder = CreateRequestBuilder::new();
1525 let create_req = builder.build();
1526 mock_region_server
1527 .handle_request(region_id, RegionRequest::Create(create_req))
1528 .await
1529 .unwrap_err();
1530
1531 let status = mock_region_server.inner.region_map.get(®ion_id);
1532 assert!(status.is_none());
1533
1534 mock_region_server
1535 .inner
1536 .region_map
1537 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1538
1539 mock_region_server
1540 .handle_request(
1541 region_id,
1542 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1543 )
1544 .await
1545 .unwrap_err();
1546
1547 let status = mock_region_server.inner.region_map.get(®ion_id);
1548 assert!(status.is_some());
1549 }
1550
1551 #[tokio::test]
1552 async fn test_batch_open_region_ignore_nonexistent_regions() {
1553 common_telemetry::init_default_ut_logging();
1554 let mut mock_region_server = mock_region_server();
1555 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1556 MITO_ENGINE_NAME,
1557 Box::new(|region_id, _request| {
1558 if region_id == RegionId::new(1, 1) {
1559 error::RegionNotFoundSnafu { region_id }.fail()
1560 } else {
1561 Ok(0)
1562 }
1563 }),
1564 );
1565 mock_region_server.register_engine(engine.clone());
1566
1567 let region_ids = mock_region_server
1568 .handle_batch_open_requests(
1569 8,
1570 vec![
1571 (
1572 RegionId::new(1, 1),
1573 RegionOpenRequest {
1574 engine: MITO_ENGINE_NAME.to_string(),
1575 table_dir: String::new(),
1576 path_type: PathType::Bare,
1577 options: Default::default(),
1578 skip_wal_replay: false,
1579 checkpoint: None,
1580 },
1581 ),
1582 (
1583 RegionId::new(1, 2),
1584 RegionOpenRequest {
1585 engine: MITO_ENGINE_NAME.to_string(),
1586 table_dir: String::new(),
1587 path_type: PathType::Bare,
1588 options: Default::default(),
1589 skip_wal_replay: false,
1590 checkpoint: None,
1591 },
1592 ),
1593 ],
1594 true,
1595 )
1596 .await
1597 .unwrap();
1598 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1599
1600 let err = mock_region_server
1601 .handle_batch_open_requests(
1602 8,
1603 vec![
1604 (
1605 RegionId::new(1, 1),
1606 RegionOpenRequest {
1607 engine: MITO_ENGINE_NAME.to_string(),
1608 table_dir: String::new(),
1609 path_type: PathType::Bare,
1610 options: Default::default(),
1611 skip_wal_replay: false,
1612 checkpoint: None,
1613 },
1614 ),
1615 (
1616 RegionId::new(1, 2),
1617 RegionOpenRequest {
1618 engine: MITO_ENGINE_NAME.to_string(),
1619 table_dir: String::new(),
1620 path_type: PathType::Bare,
1621 options: Default::default(),
1622 skip_wal_replay: false,
1623 checkpoint: None,
1624 },
1625 ),
1626 ],
1627 false,
1628 )
1629 .await
1630 .unwrap_err();
1631 assert_eq!(err.status_code(), StatusCode::Unexpected);
1632 }
1633
1634 struct CurrentEngineTest {
1635 region_id: RegionId,
1636 current_region_status: Option<RegionEngineWithStatus>,
1637 region_change: RegionChange,
1638 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1639 }
1640
1641 #[tokio::test]
1642 async fn test_current_engine() {
1643 common_telemetry::init_default_ut_logging();
1644
1645 let mut mock_region_server = mock_region_server();
1646 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1647 mock_region_server.register_engine(engine.clone());
1648
1649 let region_id = RegionId::new(1024, 1);
1650 let tests = vec![
1651 CurrentEngineTest {
1653 region_id,
1654 current_region_status: None,
1655 region_change: RegionChange::None,
1656 assert: Box::new(|result| {
1657 let err = result.unwrap_err();
1658 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1659 }),
1660 },
1661 CurrentEngineTest {
1662 region_id,
1663 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1664 region_change: RegionChange::None,
1665 assert: Box::new(|result| {
1666 let current_engine = result.unwrap();
1667 assert_matches!(current_engine, CurrentEngine::Engine(_));
1668 }),
1669 },
1670 CurrentEngineTest {
1671 region_id,
1672 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1673 region_change: RegionChange::None,
1674 assert: Box::new(|result| {
1675 let err = result.unwrap_err();
1676 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1677 }),
1678 },
1679 CurrentEngineTest {
1680 region_id,
1681 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1682 region_change: RegionChange::None,
1683 assert: Box::new(|result| {
1684 let err = result.unwrap_err();
1685 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1686 }),
1687 },
1688 CurrentEngineTest {
1690 region_id,
1691 current_region_status: None,
1692 region_change: RegionChange::Register(RegionAttribute::Mito),
1693 assert: Box::new(|result| {
1694 let current_engine = result.unwrap();
1695 assert_matches!(current_engine, CurrentEngine::Engine(_));
1696 }),
1697 },
1698 CurrentEngineTest {
1699 region_id,
1700 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1701 region_change: RegionChange::Register(RegionAttribute::Mito),
1702 assert: Box::new(|result| {
1703 let current_engine = result.unwrap();
1704 assert_matches!(current_engine, CurrentEngine::Engine(_));
1705 }),
1706 },
1707 CurrentEngineTest {
1708 region_id,
1709 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1710 region_change: RegionChange::Register(RegionAttribute::Mito),
1711 assert: Box::new(|result| {
1712 let err = result.unwrap_err();
1713 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1714 }),
1715 },
1716 CurrentEngineTest {
1717 region_id,
1718 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1719 region_change: RegionChange::Register(RegionAttribute::Mito),
1720 assert: Box::new(|result| {
1721 let current_engine = result.unwrap();
1722 assert_matches!(current_engine, CurrentEngine::Engine(_));
1723 }),
1724 },
1725 CurrentEngineTest {
1727 region_id,
1728 current_region_status: None,
1729 region_change: RegionChange::Deregisters,
1730 assert: Box::new(|result| {
1731 let current_engine = result.unwrap();
1732 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1733 }),
1734 },
1735 CurrentEngineTest {
1736 region_id,
1737 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1738 region_change: RegionChange::Deregisters,
1739 assert: Box::new(|result| {
1740 let err = result.unwrap_err();
1741 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1742 }),
1743 },
1744 CurrentEngineTest {
1745 region_id,
1746 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1747 region_change: RegionChange::Deregisters,
1748 assert: Box::new(|result| {
1749 let current_engine = result.unwrap();
1750 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1751 }),
1752 },
1753 CurrentEngineTest {
1754 region_id,
1755 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1756 region_change: RegionChange::Deregisters,
1757 assert: Box::new(|result| {
1758 let current_engine = result.unwrap();
1759 assert_matches!(current_engine, CurrentEngine::Engine(_));
1760 }),
1761 },
1762 ];
1763
1764 for test in tests {
1765 let CurrentEngineTest {
1766 region_id,
1767 current_region_status,
1768 region_change,
1769 assert,
1770 } = test;
1771
1772 if let Some(status) = current_region_status {
1774 mock_region_server
1775 .inner
1776 .region_map
1777 .insert(region_id, status);
1778 } else {
1779 mock_region_server.inner.region_map.remove(®ion_id);
1780 }
1781
1782 let result = mock_region_server
1783 .inner
1784 .get_engine(region_id, ®ion_change);
1785
1786 assert(result);
1787 }
1788 }
1789
1790 #[tokio::test]
1791 async fn test_region_server_parallelism() {
1792 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1793 let first_query = p.acquire().await;
1794 assert!(first_query.is_ok());
1795 let second_query = p.acquire().await;
1796 assert!(second_query.is_ok());
1797 let third_query = p.acquire().await;
1798 assert!(third_query.is_err());
1799 let err = third_query.unwrap_err();
1800 assert_eq!(
1801 err.output_msg(),
1802 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1803 );
1804 drop(first_query);
1805 let forth_query = p.acquire().await;
1806 assert!(forth_query.is_ok());
1807 }
1808
1809 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1810 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1811 metadata_builder.push_column_metadata(ColumnMetadata {
1812 column_schema: datatypes::schema::ColumnSchema::new(
1813 "timestamp",
1814 ConcreteDataType::timestamp_nanosecond_datatype(),
1815 false,
1816 ),
1817 semantic_type: SemanticType::Timestamp,
1818 column_id: 0,
1819 });
1820 metadata_builder.push_column_metadata(ColumnMetadata {
1821 column_schema: datatypes::schema::ColumnSchema::new(
1822 "file",
1823 ConcreteDataType::string_datatype(),
1824 true,
1825 ),
1826 semantic_type: SemanticType::Tag,
1827 column_id: 1,
1828 });
1829 metadata_builder.push_column_metadata(ColumnMetadata {
1830 column_schema: datatypes::schema::ColumnSchema::new(
1831 "message",
1832 ConcreteDataType::string_datatype(),
1833 true,
1834 ),
1835 semantic_type: SemanticType::Field,
1836 column_id: 2,
1837 });
1838 metadata_builder.primary_key(vec![1]);
1839 metadata_builder.build().unwrap()
1840 }
1841
1842 #[tokio::test]
1843 async fn test_handle_list_metadata_request() {
1844 common_telemetry::init_default_ut_logging();
1845
1846 let mut mock_region_server = mock_region_server();
1847 let region_id_1 = RegionId::new(1, 0);
1848 let region_id_2 = RegionId::new(2, 0);
1849
1850 let metadata_1 = mock_region_metadata(region_id_1);
1851 let metadata_2 = mock_region_metadata(region_id_2);
1852 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
1853
1854 let metadata_1 = Arc::new(metadata_1);
1855 let metadata_2 = Arc::new(metadata_2);
1856 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1857 MITO_ENGINE_NAME,
1858 Box::new(move |region_id| {
1859 if region_id == region_id_1 {
1860 Ok(metadata_1.clone())
1861 } else if region_id == region_id_2 {
1862 Ok(metadata_2.clone())
1863 } else {
1864 error::RegionNotFoundSnafu { region_id }.fail()
1865 }
1866 }),
1867 );
1868
1869 mock_region_server.register_engine(engine.clone());
1870 mock_region_server
1871 .inner
1872 .region_map
1873 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1874 mock_region_server
1875 .inner
1876 .region_map
1877 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1878
1879 let list_metadata_request = ListMetadataRequest {
1881 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1882 };
1883 let response = mock_region_server
1884 .handle_list_metadata_request(&list_metadata_request)
1885 .await
1886 .unwrap();
1887 let decoded_metadata: Vec<Option<RegionMetadata>> =
1888 serde_json::from_slice(&response.metadata).unwrap();
1889 assert_eq!(metadatas, decoded_metadata);
1890 }
1891
1892 #[tokio::test]
1893 async fn test_handle_list_metadata_not_found() {
1894 common_telemetry::init_default_ut_logging();
1895
1896 let mut mock_region_server = mock_region_server();
1897 let region_id_1 = RegionId::new(1, 0);
1898 let region_id_2 = RegionId::new(2, 0);
1899
1900 let metadata_1 = mock_region_metadata(region_id_1);
1901 let metadatas = vec![Some(metadata_1.clone()), None];
1902
1903 let metadata_1 = Arc::new(metadata_1);
1904 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1905 MITO_ENGINE_NAME,
1906 Box::new(move |region_id| {
1907 if region_id == region_id_1 {
1908 Ok(metadata_1.clone())
1909 } else {
1910 error::RegionNotFoundSnafu { region_id }.fail()
1911 }
1912 }),
1913 );
1914
1915 mock_region_server.register_engine(engine.clone());
1916 mock_region_server
1917 .inner
1918 .region_map
1919 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1920
1921 let list_metadata_request = ListMetadataRequest {
1923 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
1924 };
1925 let response = mock_region_server
1926 .handle_list_metadata_request(&list_metadata_request)
1927 .await
1928 .unwrap();
1929 let decoded_metadata: Vec<Option<RegionMetadata>> =
1930 serde_json::from_slice(&response.metadata).unwrap();
1931 assert_eq!(metadatas, decoded_metadata);
1932
1933 mock_region_server
1935 .inner
1936 .region_map
1937 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
1938 let response = mock_region_server
1939 .handle_list_metadata_request(&list_metadata_request)
1940 .await
1941 .unwrap();
1942 let decoded_metadata: Vec<Option<RegionMetadata>> =
1943 serde_json::from_slice(&response.metadata).unwrap();
1944 assert_eq!(metadatas, decoded_metadata);
1945 }
1946
1947 #[tokio::test]
1948 async fn test_handle_list_metadata_failed() {
1949 common_telemetry::init_default_ut_logging();
1950
1951 let mut mock_region_server = mock_region_server();
1952 let region_id_1 = RegionId::new(1, 0);
1953
1954 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
1955 MITO_ENGINE_NAME,
1956 Box::new(move |region_id| {
1957 error::UnexpectedSnafu {
1958 violated: format!("Failed to get region {region_id}"),
1959 }
1960 .fail()
1961 }),
1962 );
1963
1964 mock_region_server.register_engine(engine.clone());
1965 mock_region_server
1966 .inner
1967 .region_map
1968 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
1969
1970 let list_metadata_request = ListMetadataRequest {
1972 region_ids: vec![region_id_1.as_u64()],
1973 };
1974 mock_region_server
1975 .handle_list_metadata_request(&list_metadata_request)
1976 .await
1977 .unwrap_err();
1978 }
1979}