1mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::{Arc, RwLock};
21use std::time::Duration;
22
23use api::region::RegionResponse;
24use api::v1::meta::TopicStat;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
28};
29use api::v1::{ResponseHeader, Status};
30use arrow_flight::{FlightData, Ticket};
31use async_trait::async_trait;
32use bytes::Bytes;
33use common_error::ext::{BoxedError, ErrorExt};
34use common_error::status_code::StatusCode;
35use common_meta::datanode::TopicStatsReporter;
36use common_query::OutputData;
37use common_query::request::QueryRequest;
38use common_recordbatch::SendableRecordBatchStream;
39use common_runtime::Runtime;
40use common_telemetry::tracing::{self, info_span};
41use common_telemetry::tracing_context::{FutureExt, TracingContext};
42use common_telemetry::{debug, error, info, warn};
43use dashmap::DashMap;
44use datafusion::datasource::TableProvider;
45use datafusion_common::tree_node::TreeNode;
46use futures_util::future::try_join_all;
47use metric_engine::engine::MetricEngine;
48use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
49use prost::Message;
50use query::QueryEngineRef;
51pub use query::dummy_catalog::{
52 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
53};
54use serde_json;
55use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
56use servers::grpc::FlightCompression;
57use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
58use servers::grpc::region_server::RegionServerHandler;
59use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
60use snafu::{OptionExt, ResultExt, ensure};
61use store_api::metric_engine_consts::{
62 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
63};
64use store_api::region_engine::{
65 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
66 SettableRegionRoleState,
67};
68use store_api::region_request::{
69 AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
70 RegionOpenRequest, RegionRequest,
71};
72use store_api::storage::RegionId;
73use tokio::sync::{Semaphore, SemaphorePermit};
74use tokio::time::timeout;
75use tonic::{Request, Response, Result as TonicResult};
76
77use crate::error::{
78 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
79 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
80 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
81 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
82 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
83 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
84};
85use crate::event_listener::RegionServerEventListenerRef;
86use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
87
88#[derive(Clone)]
89pub struct RegionServer {
90 inner: Arc<RegionServerInner>,
91 flight_compression: FlightCompression,
92}
93
94pub struct RegionStat {
95 pub region_id: RegionId,
96 pub engine: String,
97 pub role: RegionRole,
98}
99
100impl RegionServer {
101 pub fn new(
102 query_engine: QueryEngineRef,
103 runtime: Runtime,
104 event_listener: RegionServerEventListenerRef,
105 flight_compression: FlightCompression,
106 ) -> Self {
107 Self::with_table_provider(
108 query_engine,
109 runtime,
110 event_listener,
111 Arc::new(DummyTableProviderFactory),
112 0,
113 Duration::from_millis(0),
114 flight_compression,
115 )
116 }
117
118 pub fn with_table_provider(
119 query_engine: QueryEngineRef,
120 runtime: Runtime,
121 event_listener: RegionServerEventListenerRef,
122 table_provider_factory: TableProviderFactoryRef,
123 max_concurrent_queries: usize,
124 concurrent_query_limiter_timeout: Duration,
125 flight_compression: FlightCompression,
126 ) -> Self {
127 Self {
128 inner: Arc::new(RegionServerInner::new(
129 query_engine,
130 runtime,
131 event_listener,
132 table_provider_factory,
133 RegionServerParallelism::from_opts(
134 max_concurrent_queries,
135 concurrent_query_limiter_timeout,
136 ),
137 )),
138 flight_compression,
139 }
140 }
141
142 pub fn register_engine(&mut self, engine: RegionEngineRef) {
144 self.inner.register_engine(engine);
145 }
146
147 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
149 self.inner.set_topic_stats_reporter(topic_stats_reporter);
150 }
151
152 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
154 match self.inner.get_engine(region_id, &RegionChange::None) {
155 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
156 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
157 Err(error::Error::RegionNotFound { .. }) => Ok(None),
158 Err(err) => Err(err),
159 }
160 }
161
162 pub fn mito_engine(&self) -> Option<MitoEngine> {
164 if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
165 Some(mito)
166 } else {
167 self.inner
168 .engines
169 .read()
170 .unwrap()
171 .get(MITO_ENGINE_NAME)
172 .cloned()
173 .and_then(|e| {
174 let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
175 if mito.is_none() {
176 warn!("Mito engine not found in region server engines");
177 }
178 mito
179 })
180 }
181 }
182
183 #[tracing::instrument(skip_all)]
184 pub async fn handle_batch_open_requests(
185 &self,
186 parallelism: usize,
187 requests: Vec<(RegionId, RegionOpenRequest)>,
188 ignore_nonexistent_region: bool,
189 ) -> Result<Vec<RegionId>> {
190 self.inner
191 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
192 .await
193 }
194
195 #[tracing::instrument(skip_all)]
196 pub async fn handle_batch_catchup_requests(
197 &self,
198 parallelism: usize,
199 requests: Vec<(RegionId, RegionCatchupRequest)>,
200 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
201 self.inner
202 .handle_batch_catchup_requests(parallelism, requests)
203 .await
204 }
205
206 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
207 pub async fn handle_request(
208 &self,
209 region_id: RegionId,
210 request: RegionRequest,
211 ) -> Result<RegionResponse> {
212 self.inner.handle_request(region_id, request).await
213 }
214
215 async fn table_provider(
217 &self,
218 region_id: RegionId,
219 ctx: Option<QueryContextRef>,
220 ) -> Result<Arc<dyn TableProvider>> {
221 let status = self
222 .inner
223 .region_map
224 .get(®ion_id)
225 .context(RegionNotFoundSnafu { region_id })?
226 .clone();
227 ensure!(
228 matches!(status, RegionEngineWithStatus::Ready(_)),
229 RegionNotReadySnafu { region_id }
230 );
231
232 self.inner
233 .table_provider_factory
234 .create(region_id, status.into_engine(), ctx)
235 .await
236 .context(ExecuteLogicalPlanSnafu)
237 }
238
239 pub async fn handle_remote_read(
241 &self,
242 request: api::v1::region::QueryRequest,
243 query_ctx: QueryContextRef,
244 ) -> Result<SendableRecordBatchStream> {
245 let _permit = if let Some(p) = &self.inner.parallelism {
246 Some(p.acquire().await?)
247 } else {
248 None
249 };
250
251 let region_id = RegionId::from_u64(request.region_id);
252 let catalog_list = Arc::new(NameAwareCatalogList::new(
253 self.clone(),
254 region_id,
255 query_ctx.clone(),
256 ));
257
258 if query_ctx.explain_verbose() {
259 common_telemetry::info!("Handle remote read for region: {}", region_id);
260 }
261
262 let decoder = self
263 .inner
264 .query_engine
265 .engine_context(query_ctx.clone())
266 .new_plan_decoder()
267 .context(NewPlanDecoderSnafu)?;
268
269 let plan = decoder
270 .decode(Bytes::from(request.plan), catalog_list, false)
271 .await
272 .context(DecodeLogicalPlanSnafu)?;
273
274 self.inner
275 .handle_read(
276 QueryRequest {
277 header: request.header,
278 region_id,
279 plan,
280 },
281 query_ctx,
282 )
283 .await
284 }
285
286 #[tracing::instrument(skip_all)]
287 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
288 let _permit = if let Some(p) = &self.inner.parallelism {
289 Some(p.acquire().await?)
290 } else {
291 None
292 };
293
294 let ctx = request.header.as_ref().map(|h| h.into());
295 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
296
297 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
298 .context(DataFusionSnafu)?;
299 let mut injector = injector_builder
300 .build(self, request.region_id, query_ctx.clone())
301 .await?;
302
303 let plan = request
304 .plan
305 .rewrite(&mut injector)
306 .context(DataFusionSnafu)?
307 .data;
308
309 self.inner
310 .handle_read(QueryRequest { plan, ..request }, query_ctx)
311 .await
312 }
313
314 pub fn reportable_regions(&self) -> Vec<RegionStat> {
318 self.inner
319 .region_map
320 .iter()
321 .filter_map(|e| {
322 let region_id = *e.key();
323 e.role(region_id).map(|role| RegionStat {
325 region_id,
326 engine: e.value().name().to_string(),
327 role,
328 })
329 })
330 .collect()
331 }
332
333 pub fn topic_stats(&self) -> Vec<TopicStat> {
335 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
336 let Some(reporter) = reporter.as_mut() else {
337 return vec![];
338 };
339 reporter
340 .reportable_topics()
341 .into_iter()
342 .map(|stat| TopicStat {
343 topic_name: stat.topic,
344 record_size: stat.record_size,
345 record_num: stat.record_num,
346 latest_entry_id: stat.latest_entry_id,
347 })
348 .collect()
349 }
350
351 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
352 self.inner.region_map.get(®ion_id).and_then(|engine| {
353 engine.role(region_id).map(|role| match role {
354 RegionRole::Follower => false,
355 RegionRole::Leader => true,
356 RegionRole::DowngradingLeader => true,
357 })
358 })
359 }
360
361 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
362 let engine = self
363 .inner
364 .region_map
365 .get(®ion_id)
366 .with_context(|| RegionNotFoundSnafu { region_id })?;
367 engine
368 .set_region_role(region_id, role)
369 .with_context(|_| HandleRegionRequestSnafu { region_id })
370 }
371
372 pub async fn set_region_role_state_gracefully(
382 &self,
383 region_id: RegionId,
384 state: SettableRegionRoleState,
385 ) -> Result<SetRegionRoleStateResponse> {
386 match self.inner.region_map.get(®ion_id) {
387 Some(engine) => Ok(engine
388 .set_region_role_state_gracefully(region_id, state)
389 .await
390 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
391 None => Ok(SetRegionRoleStateResponse::NotFound),
392 }
393 }
394
395 pub fn runtime(&self) -> Runtime {
396 self.inner.runtime.clone()
397 }
398
399 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
400 match self.inner.region_map.get(®ion_id) {
401 Some(e) => e.region_statistic(region_id),
402 None => None,
403 }
404 }
405
406 pub async fn stop(&self) -> Result<()> {
408 self.inner.stop().await
409 }
410
411 #[cfg(test)]
412 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
414 {
415 let mut engines = self.inner.engines.write().unwrap();
416 if !engines.contains_key(engine.name()) {
417 debug!("Registering test engine: {}", engine.name());
418 engines.insert(engine.name().to_string(), engine.clone());
419 }
420 }
421
422 self.inner
423 .region_map
424 .insert(region_id, RegionEngineWithStatus::Ready(engine));
425 }
426
427 async fn handle_batch_ddl_requests(
428 &self,
429 request: region_request::Body,
430 ) -> Result<RegionResponse> {
431 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
433 .context(BuildRegionRequestsSnafu)?
434 .unwrap();
435 let tracing_context = TracingContext::from_current_span();
436
437 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
438 self.inner
439 .handle_batch_request(batch_request)
440 .trace(span)
441 .await
442 }
443
444 async fn handle_requests_in_parallel(
445 &self,
446 request: region_request::Body,
447 ) -> Result<RegionResponse> {
448 let requests =
449 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
450 let tracing_context = TracingContext::from_current_span();
451
452 let join_tasks = requests.into_iter().map(|(region_id, req)| {
453 let self_to_move = self;
454 let span = tracing_context.attach(info_span!(
455 "RegionServer::handle_region_request",
456 region_id = region_id.to_string()
457 ));
458 async move {
459 self_to_move
460 .handle_request(region_id, req)
461 .trace(span)
462 .await
463 }
464 });
465
466 let results = try_join_all(join_tasks).await?;
467 let mut affected_rows = 0;
468 let mut extensions = HashMap::new();
469 for result in results {
470 affected_rows += result.affected_rows;
471 extensions.extend(result.extensions);
472 }
473
474 Ok(RegionResponse {
475 affected_rows,
476 extensions,
477 metadata: Vec::new(),
478 })
479 }
480
481 async fn handle_requests_in_serial(
482 &self,
483 request: region_request::Body,
484 ) -> Result<RegionResponse> {
485 let requests =
486 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
487 let tracing_context = TracingContext::from_current_span();
488
489 let mut affected_rows = 0;
490 let mut extensions = HashMap::new();
491 for (region_id, req) in requests {
494 let span = tracing_context.attach(info_span!(
495 "RegionServer::handle_region_request",
496 region_id = region_id.to_string()
497 ));
498 let result = self.handle_request(region_id, req).trace(span).await?;
499
500 affected_rows += result.affected_rows;
501 extensions.extend(result.extensions);
502 }
503
504 Ok(RegionResponse {
505 affected_rows,
506 extensions,
507 metadata: Vec::new(),
508 })
509 }
510
511 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
512 let region_id = RegionId::from_u64(request.region_id);
513 let manifest_info = request
514 .manifest_info
515 .context(error::MissingRequiredFieldSnafu {
516 name: "manifest_info",
517 })?;
518
519 let manifest_info = match manifest_info {
520 ManifestInfo::MitoManifestInfo(info) => {
521 RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
522 }
523 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
524 info.data_manifest_version,
525 0,
526 info.metadata_manifest_version,
527 0,
528 ),
529 };
530
531 let tracing_context = TracingContext::from_current_span();
532 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
533
534 self.sync_region(region_id, manifest_info)
535 .trace(span)
536 .await
537 .map(|_| RegionResponse::new(AffectedRows::default()))
538 }
539
540 #[tracing::instrument(skip_all)]
545 async fn handle_list_metadata_request(
546 &self,
547 request: &ListMetadataRequest,
548 ) -> Result<RegionResponse> {
549 let mut region_metadatas = Vec::new();
550 for region_id in &request.region_ids {
552 let region_id = RegionId::from_u64(*region_id);
553 let Some(engine) = self.find_engine(region_id)? else {
555 region_metadatas.push(None);
556 continue;
557 };
558
559 match engine.get_metadata(region_id).await {
560 Ok(metadata) => region_metadatas.push(Some(metadata)),
561 Err(err) => {
562 if err.status_code() == StatusCode::RegionNotFound {
563 region_metadatas.push(None);
564 } else {
565 Err(err).with_context(|_| GetRegionMetadataSnafu {
566 engine: engine.name(),
567 region_id,
568 })?;
569 }
570 }
571 }
572 }
573
574 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
576
577 let response = RegionResponse::from_metadata(json_result);
578
579 Ok(response)
580 }
581
582 pub async fn sync_region(
584 &self,
585 region_id: RegionId,
586 manifest_info: RegionManifestInfo,
587 ) -> Result<()> {
588 let engine_with_status = self
589 .inner
590 .region_map
591 .get(®ion_id)
592 .with_context(|| RegionNotFoundSnafu { region_id })?;
593
594 self.inner
595 .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
596 .await
597 }
598}
599
600#[async_trait]
601impl RegionServerHandler for RegionServer {
602 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
603 let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
604 .with_label_values(&[request.as_ref()]);
605 let response = match &request {
606 region_request::Body::Creates(_)
607 | region_request::Body::Drops(_)
608 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
609 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
610 self.handle_requests_in_parallel(request).await
611 }
612 region_request::Body::Sync(sync_request) => {
613 self.handle_sync_region_request(sync_request).await
614 }
615 region_request::Body::ListMetadata(list_metadata_request) => {
616 self.handle_list_metadata_request(list_metadata_request)
617 .await
618 }
619 _ => self.handle_requests_in_serial(request).await,
620 }
621 .map_err(BoxedError::new)
622 .inspect_err(|_| {
623 failed_requests_cnt.inc();
624 })
625 .context(ExecuteGrpcRequestSnafu)?;
626
627 Ok(RegionResponseV1 {
628 header: Some(ResponseHeader {
629 status: Some(Status {
630 status_code: StatusCode::Success as _,
631 ..Default::default()
632 }),
633 }),
634 affected_rows: response.affected_rows as _,
635 extensions: response.extensions,
636 metadata: response.metadata,
637 })
638 }
639}
640
641#[async_trait]
642impl FlightCraft for RegionServer {
643 async fn do_get(
644 &self,
645 request: Request<Ticket>,
646 ) -> TonicResult<Response<TonicStream<FlightData>>> {
647 let ticket = request.into_inner().ticket;
648 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
649 .context(servers_error::InvalidFlightTicketSnafu)?;
650 let tracing_context = request
651 .header
652 .as_ref()
653 .map(|h| TracingContext::from_w3c(&h.tracing_context))
654 .unwrap_or_default();
655 let query_ctx = request
656 .header
657 .as_ref()
658 .map(|h| Arc::new(QueryContext::from(h)))
659 .unwrap_or(QueryContext::arc());
660
661 let result = self
662 .handle_remote_read(request, query_ctx.clone())
663 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
664 .await?;
665
666 let stream = Box::pin(FlightRecordBatchStream::new(
667 result,
668 tracing_context,
669 self.flight_compression,
670 query_ctx,
671 ));
672 Ok(Response::new(stream))
673 }
674}
675
676#[derive(Clone)]
677enum RegionEngineWithStatus {
678 Registering(RegionEngineRef),
680 Deregistering(RegionEngineRef),
682 Ready(RegionEngineRef),
684}
685
686impl RegionEngineWithStatus {
687 pub fn into_engine(self) -> RegionEngineRef {
689 match self {
690 RegionEngineWithStatus::Registering(engine) => engine,
691 RegionEngineWithStatus::Deregistering(engine) => engine,
692 RegionEngineWithStatus::Ready(engine) => engine,
693 }
694 }
695
696 pub fn engine(&self) -> &RegionEngineRef {
698 match self {
699 RegionEngineWithStatus::Registering(engine) => engine,
700 RegionEngineWithStatus::Deregistering(engine) => engine,
701 RegionEngineWithStatus::Ready(engine) => engine,
702 }
703 }
704}
705
706impl Deref for RegionEngineWithStatus {
707 type Target = RegionEngineRef;
708
709 fn deref(&self) -> &Self::Target {
710 match self {
711 RegionEngineWithStatus::Registering(engine) => engine,
712 RegionEngineWithStatus::Deregistering(engine) => engine,
713 RegionEngineWithStatus::Ready(engine) => engine,
714 }
715 }
716}
717
718struct RegionServerInner {
719 engines: RwLock<HashMap<String, RegionEngineRef>>,
720 region_map: DashMap<RegionId, RegionEngineWithStatus>,
721 query_engine: QueryEngineRef,
722 runtime: Runtime,
723 event_listener: RegionServerEventListenerRef,
724 table_provider_factory: TableProviderFactoryRef,
725 parallelism: Option<RegionServerParallelism>,
728 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
730 mito_engine: RwLock<Option<MitoEngine>>,
734}
735
736struct RegionServerParallelism {
737 semaphore: Semaphore,
738 timeout: Duration,
739}
740
741impl RegionServerParallelism {
742 pub fn from_opts(
743 max_concurrent_queries: usize,
744 concurrent_query_limiter_timeout: Duration,
745 ) -> Option<Self> {
746 if max_concurrent_queries == 0 {
747 return None;
748 }
749 Some(RegionServerParallelism {
750 semaphore: Semaphore::new(max_concurrent_queries),
751 timeout: concurrent_query_limiter_timeout,
752 })
753 }
754
755 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
756 timeout(self.timeout, self.semaphore.acquire())
757 .await
758 .context(ConcurrentQueryLimiterTimeoutSnafu)?
759 .context(ConcurrentQueryLimiterClosedSnafu)
760 }
761}
762
763enum CurrentEngine {
764 Engine(RegionEngineRef),
765 EarlyReturn(AffectedRows),
766}
767
768impl Debug for CurrentEngine {
769 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
770 match self {
771 CurrentEngine::Engine(engine) => f
772 .debug_struct("CurrentEngine")
773 .field("engine", &engine.name())
774 .finish(),
775 CurrentEngine::EarlyReturn(rows) => f
776 .debug_struct("CurrentEngine")
777 .field("return", rows)
778 .finish(),
779 }
780 }
781}
782
783impl RegionServerInner {
784 pub fn new(
785 query_engine: QueryEngineRef,
786 runtime: Runtime,
787 event_listener: RegionServerEventListenerRef,
788 table_provider_factory: TableProviderFactoryRef,
789 parallelism: Option<RegionServerParallelism>,
790 ) -> Self {
791 Self {
792 engines: RwLock::new(HashMap::new()),
793 region_map: DashMap::new(),
794 query_engine,
795 runtime,
796 event_listener,
797 table_provider_factory,
798 parallelism,
799 topic_stats_reporter: RwLock::new(None),
800 mito_engine: RwLock::new(None),
801 }
802 }
803
804 pub fn register_engine(&self, engine: RegionEngineRef) {
805 let engine_name = engine.name();
806 if engine_name == MITO_ENGINE_NAME
807 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
808 {
809 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
810 }
811
812 info!("Region Engine {engine_name} is registered");
813 self.engines
814 .write()
815 .unwrap()
816 .insert(engine_name.to_string(), engine);
817 }
818
819 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
820 info!("Set topic stats reporter");
821 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
822 }
823
824 fn get_engine(
825 &self,
826 region_id: RegionId,
827 region_change: &RegionChange,
828 ) -> Result<CurrentEngine> {
829 let current_region_status = self.region_map.get(®ion_id);
830
831 let engine = match region_change {
832 RegionChange::Register(attribute) => match current_region_status {
833 Some(status) => match status.clone() {
834 RegionEngineWithStatus::Registering(engine) => engine,
835 RegionEngineWithStatus::Deregistering(_) => {
836 return error::RegionBusySnafu { region_id }.fail();
837 }
838 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
839 },
840 _ => self
841 .engines
842 .read()
843 .unwrap()
844 .get(attribute.engine())
845 .with_context(|| RegionEngineNotFoundSnafu {
846 name: attribute.engine(),
847 })?
848 .clone(),
849 },
850 RegionChange::Deregisters => match current_region_status {
851 Some(status) => match status.clone() {
852 RegionEngineWithStatus::Registering(_) => {
853 return error::RegionBusySnafu { region_id }.fail();
854 }
855 RegionEngineWithStatus::Deregistering(_) => {
856 return Ok(CurrentEngine::EarlyReturn(0));
857 }
858 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
859 },
860 None => return Ok(CurrentEngine::EarlyReturn(0)),
861 },
862 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
863 match current_region_status {
864 Some(status) => match status.clone() {
865 RegionEngineWithStatus::Registering(_) => {
866 return error::RegionNotReadySnafu { region_id }.fail();
867 }
868 RegionEngineWithStatus::Deregistering(_) => {
869 return error::RegionNotFoundSnafu { region_id }.fail();
870 }
871 RegionEngineWithStatus::Ready(engine) => engine,
872 },
873 None => return error::RegionNotFoundSnafu { region_id }.fail(),
874 }
875 }
876 };
877
878 Ok(CurrentEngine::Engine(engine))
879 }
880
881 async fn handle_batch_open_requests_inner(
882 &self,
883 engine: RegionEngineRef,
884 parallelism: usize,
885 requests: Vec<(RegionId, RegionOpenRequest)>,
886 ignore_nonexistent_region: bool,
887 ) -> Result<Vec<RegionId>> {
888 let region_changes = requests
889 .iter()
890 .map(|(region_id, open)| {
891 let attribute = parse_region_attribute(&open.engine, &open.options)?;
892 Ok((*region_id, RegionChange::Register(attribute)))
893 })
894 .collect::<Result<HashMap<_, _>>>()?;
895
896 for (®ion_id, region_change) in ®ion_changes {
897 self.set_region_status_not_ready(region_id, &engine, region_change)
898 }
899
900 let mut open_regions = Vec::with_capacity(requests.len());
901 let mut errors = vec![];
902 match engine
903 .handle_batch_open_requests(parallelism, requests)
904 .await
905 .with_context(|_| HandleBatchOpenRequestSnafu)
906 {
907 Ok(results) => {
908 for (region_id, result) in results {
909 let region_change = ®ion_changes[®ion_id];
910 match result {
911 Ok(_) => {
912 if let Err(e) = self
913 .set_region_status_ready(region_id, engine.clone(), *region_change)
914 .await
915 {
916 error!(e; "Failed to set region to ready: {}", region_id);
917 errors.push(BoxedError::new(e));
918 } else {
919 open_regions.push(region_id)
920 }
921 }
922 Err(e) => {
923 self.unset_region_status(region_id, &engine, *region_change);
924 if e.status_code() == StatusCode::RegionNotFound
925 && ignore_nonexistent_region
926 {
927 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
928 } else {
929 error!(e; "Failed to open region: {}", region_id);
930 errors.push(e);
931 }
932 }
933 }
934 }
935 }
936 Err(e) => {
937 for (®ion_id, region_change) in ®ion_changes {
938 self.unset_region_status(region_id, &engine, *region_change);
939 }
940 error!(e; "Failed to open batch regions");
941 errors.push(BoxedError::new(e));
942 }
943 }
944
945 if !errors.is_empty() {
946 return error::UnexpectedSnafu {
947 violated: format!("Failed to open batch regions: {:?}", errors[0]),
949 }
950 .fail();
951 }
952
953 Ok(open_regions)
954 }
955
956 pub async fn handle_batch_open_requests(
957 &self,
958 parallelism: usize,
959 requests: Vec<(RegionId, RegionOpenRequest)>,
960 ignore_nonexistent_region: bool,
961 ) -> Result<Vec<RegionId>> {
962 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
963 HashMap::with_capacity(requests.len());
964 for (region_id, request) in requests {
965 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
966 requests.push((region_id, request));
967 } else {
968 engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
969 }
970 }
971
972 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
973 for (engine, requests) in engine_grouped_requests {
974 let engine = self
975 .engines
976 .read()
977 .unwrap()
978 .get(&engine)
979 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
980 .clone();
981 results.push(
982 self.handle_batch_open_requests_inner(
983 engine,
984 parallelism,
985 requests,
986 ignore_nonexistent_region,
987 )
988 .await,
989 )
990 }
991
992 Ok(results
993 .into_iter()
994 .collect::<Result<Vec<_>>>()?
995 .into_iter()
996 .flatten()
997 .collect::<Vec<_>>())
998 }
999
1000 pub async fn handle_batch_catchup_requests_inner(
1001 &self,
1002 engine: RegionEngineRef,
1003 parallelism: usize,
1004 requests: Vec<(RegionId, RegionCatchupRequest)>,
1005 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1006 for (region_id, _) in &requests {
1007 self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1008 }
1009 let region_ids = requests
1010 .iter()
1011 .map(|(region_id, _)| *region_id)
1012 .collect::<Vec<_>>();
1013 let mut responses = Vec::with_capacity(requests.len());
1014 match engine
1015 .handle_batch_catchup_requests(parallelism, requests)
1016 .await
1017 {
1018 Ok(results) => {
1019 for (region_id, result) in results {
1020 match result {
1021 Ok(_) => {
1022 if let Err(e) = self
1023 .set_region_status_ready(
1024 region_id,
1025 engine.clone(),
1026 RegionChange::Catchup,
1027 )
1028 .await
1029 {
1030 error!(e; "Failed to set region to ready: {}", region_id);
1031 responses.push((region_id, Err(BoxedError::new(e))));
1032 } else {
1033 responses.push((region_id, Ok(())));
1034 }
1035 }
1036 Err(e) => {
1037 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1038 error!(e; "Failed to catchup region: {}", region_id);
1039 responses.push((region_id, Err(e)));
1040 }
1041 }
1042 }
1043 }
1044 Err(e) => {
1045 for region_id in region_ids {
1046 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1047 }
1048 error!(e; "Failed to catchup batch regions");
1049 return error::UnexpectedSnafu {
1050 violated: format!("Failed to catchup batch regions: {:?}", e),
1051 }
1052 .fail();
1053 }
1054 }
1055
1056 Ok(responses)
1057 }
1058
1059 pub async fn handle_batch_catchup_requests(
1060 &self,
1061 parallelism: usize,
1062 requests: Vec<(RegionId, RegionCatchupRequest)>,
1063 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1064 let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1065
1066 let mut responses = Vec::with_capacity(requests.len());
1067 for (region_id, request) in requests {
1068 if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1069 match engine {
1070 CurrentEngine::Engine(engine) => {
1071 engine_grouped_requests
1072 .entry(engine.name().to_string())
1073 .or_default()
1074 .push((region_id, request));
1075 }
1076 CurrentEngine::EarlyReturn(_) => {
1077 return error::UnexpectedSnafu {
1078 violated: format!("Unexpected engine type for region {}", region_id),
1079 }
1080 .fail();
1081 }
1082 }
1083 } else {
1084 responses.push((
1085 region_id,
1086 Err(BoxedError::new(
1087 error::RegionNotFoundSnafu { region_id }.build(),
1088 )),
1089 ));
1090 }
1091 }
1092
1093 for (engine, requests) in engine_grouped_requests {
1094 let engine = self
1095 .engines
1096 .read()
1097 .unwrap()
1098 .get(&engine)
1099 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1100 .clone();
1101 responses.extend(
1102 self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1103 .await?,
1104 );
1105 }
1106
1107 Ok(responses)
1108 }
1109
1110 pub async fn handle_batch_request(
1114 &self,
1115 batch_request: BatchRegionDdlRequest,
1116 ) -> Result<RegionResponse> {
1117 let region_changes = match &batch_request {
1118 BatchRegionDdlRequest::Create(requests) => requests
1119 .iter()
1120 .map(|(region_id, create)| {
1121 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1122 Ok((*region_id, RegionChange::Register(attribute)))
1123 })
1124 .collect::<Result<Vec<_>>>()?,
1125 BatchRegionDdlRequest::Drop(requests) => requests
1126 .iter()
1127 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1128 .collect::<Vec<_>>(),
1129 BatchRegionDdlRequest::Alter(requests) => requests
1130 .iter()
1131 .map(|(region_id, _)| (*region_id, RegionChange::None))
1132 .collect::<Vec<_>>(),
1133 };
1134
1135 let (first_region_id, first_region_change) = region_changes.first().unwrap();
1138 let engine = match self.get_engine(*first_region_id, first_region_change)? {
1139 CurrentEngine::Engine(engine) => engine,
1140 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1141 };
1142
1143 for (region_id, region_change) in region_changes.iter() {
1144 self.set_region_status_not_ready(*region_id, &engine, region_change);
1145 }
1146
1147 let ddl_type = batch_request.request_type();
1148 let result = engine
1149 .handle_batch_ddl_requests(batch_request)
1150 .await
1151 .context(HandleBatchDdlRequestSnafu { ddl_type });
1152
1153 match result {
1154 Ok(result) => {
1155 for (region_id, region_change) in ®ion_changes {
1156 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1157 .await?;
1158 }
1159
1160 Ok(RegionResponse {
1161 affected_rows: result.affected_rows,
1162 extensions: result.extensions,
1163 metadata: Vec::new(),
1164 })
1165 }
1166 Err(err) => {
1167 for (region_id, region_change) in region_changes {
1168 self.unset_region_status(region_id, &engine, region_change);
1169 }
1170
1171 Err(err)
1172 }
1173 }
1174 }
1175
1176 pub async fn handle_request(
1177 &self,
1178 region_id: RegionId,
1179 request: RegionRequest,
1180 ) -> Result<RegionResponse> {
1181 let request_type = request.request_type();
1182 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1183 .with_label_values(&[request_type])
1184 .start_timer();
1185
1186 let region_change = match &request {
1187 RegionRequest::Create(create) => {
1188 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1189 RegionChange::Register(attribute)
1190 }
1191 RegionRequest::Open(open) => {
1192 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1193 RegionChange::Register(attribute)
1194 }
1195 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1196 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1197 RegionChange::Ingest
1198 }
1199 RegionRequest::Alter(_)
1200 | RegionRequest::Flush(_)
1201 | RegionRequest::Compact(_)
1202 | RegionRequest::Truncate(_)
1203 | RegionRequest::BuildIndex(_) => RegionChange::None,
1204 RegionRequest::Catchup(_) => RegionChange::Catchup,
1205 };
1206
1207 let engine = match self.get_engine(region_id, ®ion_change)? {
1208 CurrentEngine::Engine(engine) => engine,
1209 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1210 };
1211
1212 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1214
1215 match engine
1216 .handle_request(region_id, request)
1217 .await
1218 .with_context(|_| HandleRegionRequestSnafu { region_id })
1219 {
1220 Ok(result) => {
1221 if matches!(region_change, RegionChange::Ingest) {
1223 crate::metrics::REGION_CHANGED_ROW_COUNT
1224 .with_label_values(&[request_type])
1225 .inc_by(result.affected_rows as u64);
1226 }
1227 self.set_region_status_ready(region_id, engine.clone(), region_change)
1229 .await?;
1230
1231 Ok(RegionResponse {
1232 affected_rows: result.affected_rows,
1233 extensions: result.extensions,
1234 metadata: Vec::new(),
1235 })
1236 }
1237 Err(err) => {
1238 if matches!(region_change, RegionChange::Ingest) {
1239 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1240 .with_label_values(&[request_type])
1241 .inc();
1242 }
1243 self.unset_region_status(region_id, &engine, region_change);
1245 Err(err)
1246 }
1247 }
1248 }
1249
1250 pub async fn handle_sync_region(
1252 &self,
1253 engine: &RegionEngineRef,
1254 region_id: RegionId,
1255 manifest_info: RegionManifestInfo,
1256 ) -> Result<()> {
1257 let Some(new_opened_regions) = engine
1258 .sync_region(region_id, manifest_info)
1259 .await
1260 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1261 .new_opened_logical_region_ids()
1262 else {
1263 warn!("No new opened logical regions");
1264 return Ok(());
1265 };
1266
1267 for region in new_opened_regions {
1268 self.region_map
1269 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1270 info!("Logical region {} is registered!", region);
1271 }
1272
1273 Ok(())
1274 }
1275
1276 fn set_region_status_not_ready(
1277 &self,
1278 region_id: RegionId,
1279 engine: &RegionEngineRef,
1280 region_change: &RegionChange,
1281 ) {
1282 match region_change {
1283 RegionChange::Register(_) => {
1284 self.region_map.insert(
1285 region_id,
1286 RegionEngineWithStatus::Registering(engine.clone()),
1287 );
1288 }
1289 RegionChange::Deregisters => {
1290 self.region_map.insert(
1291 region_id,
1292 RegionEngineWithStatus::Deregistering(engine.clone()),
1293 );
1294 }
1295 _ => {}
1296 }
1297 }
1298
1299 fn unset_region_status(
1300 &self,
1301 region_id: RegionId,
1302 engine: &RegionEngineRef,
1303 region_change: RegionChange,
1304 ) {
1305 match region_change {
1306 RegionChange::None | RegionChange::Ingest => {}
1307 RegionChange::Register(_) => {
1308 self.region_map.remove(®ion_id);
1309 }
1310 RegionChange::Deregisters => {
1311 self.region_map
1312 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1313 }
1314 RegionChange::Catchup => {}
1315 }
1316 }
1317
1318 async fn set_region_status_ready(
1319 &self,
1320 region_id: RegionId,
1321 engine: RegionEngineRef,
1322 region_change: RegionChange,
1323 ) -> Result<()> {
1324 let engine_type = engine.name();
1325 match region_change {
1326 RegionChange::None | RegionChange::Ingest => {}
1327 RegionChange::Register(attribute) => {
1328 info!(
1329 "Region {region_id} is registered to engine {}",
1330 attribute.engine()
1331 );
1332 self.region_map
1333 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1334
1335 match attribute {
1336 RegionAttribute::Metric { physical } => {
1337 if physical {
1338 self.register_logical_regions(&engine, region_id).await?;
1340 self.event_listener.on_region_registered(region_id);
1342 }
1343 }
1344 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1345 RegionAttribute::File => {
1346 }
1348 }
1349 }
1350 RegionChange::Deregisters => {
1351 info!("Region {region_id} is deregistered from engine {engine_type}");
1352 self.region_map
1353 .remove(®ion_id)
1354 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1355 self.event_listener.on_region_deregistered(region_id);
1356 }
1357 RegionChange::Catchup => {
1358 if is_metric_engine(engine.name()) {
1359 self.register_logical_regions(&engine, region_id).await?;
1361 }
1362 }
1363 }
1364 Ok(())
1365 }
1366
1367 async fn register_logical_regions(
1368 &self,
1369 engine: &RegionEngineRef,
1370 physical_region_id: RegionId,
1371 ) -> Result<()> {
1372 let metric_engine =
1373 engine
1374 .as_any()
1375 .downcast_ref::<MetricEngine>()
1376 .context(UnexpectedSnafu {
1377 violated: format!(
1378 "expecting engine type '{}', actual '{}'",
1379 METRIC_ENGINE_NAME,
1380 engine.name(),
1381 ),
1382 })?;
1383
1384 let logical_regions = metric_engine
1385 .logical_regions(physical_region_id)
1386 .await
1387 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1388
1389 for region in logical_regions {
1390 self.region_map
1391 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1392 info!("Logical region {} is registered!", region);
1393 }
1394 Ok(())
1395 }
1396
1397 pub async fn handle_read(
1398 &self,
1399 request: QueryRequest,
1400 query_ctx: QueryContextRef,
1401 ) -> Result<SendableRecordBatchStream> {
1402 let result = self
1405 .query_engine
1406 .execute(request.plan, query_ctx)
1407 .await
1408 .context(ExecuteLogicalPlanSnafu)?;
1409
1410 match result.data {
1411 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1412 UnsupportedOutputSnafu { expected: "stream" }.fail()
1413 }
1414 OutputData::Stream(stream) => Ok(stream),
1415 }
1416 }
1417
1418 async fn stop(&self) -> Result<()> {
1419 let regions = self
1427 .region_map
1428 .iter()
1429 .map(|x| (*x.key(), x.value().clone()))
1430 .collect::<Vec<_>>();
1431 let num_regions = regions.len();
1432
1433 for (region_id, engine) in regions {
1434 let closed = engine
1435 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1436 .await;
1437 match closed {
1438 Ok(_) => debug!("Region {region_id} is closed"),
1439 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1440 }
1441 }
1442 self.region_map.clear();
1443 info!("closed {num_regions} regions");
1444
1445 drop(self.mito_engine.write().unwrap().take());
1446 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1447 for (engine_name, engine) in engines {
1448 engine
1449 .stop()
1450 .await
1451 .context(StopRegionEngineSnafu { name: &engine_name })?;
1452 info!("Region engine {engine_name} is stopped");
1453 }
1454
1455 Ok(())
1456 }
1457}
1458
1459#[derive(Debug, Clone, Copy)]
1460enum RegionChange {
1461 None,
1462 Register(RegionAttribute),
1463 Deregisters,
1464 Catchup,
1465 Ingest,
1466}
1467
1468fn is_metric_engine(engine: &str) -> bool {
1469 engine == METRIC_ENGINE_NAME
1470}
1471
1472fn parse_region_attribute(
1473 engine: &str,
1474 options: &HashMap<String, String>,
1475) -> Result<RegionAttribute> {
1476 match engine {
1477 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1478 METRIC_ENGINE_NAME => {
1479 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1480
1481 Ok(RegionAttribute::Metric { physical })
1482 }
1483 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1484 _ => error::UnexpectedSnafu {
1485 violated: format!("Unknown engine: {}", engine),
1486 }
1487 .fail(),
1488 }
1489}
1490
1491#[derive(Debug, Clone, Copy)]
1492enum RegionAttribute {
1493 Mito,
1494 Metric { physical: bool },
1495 File,
1496}
1497
1498impl RegionAttribute {
1499 fn engine(&self) -> &'static str {
1500 match self {
1501 RegionAttribute::Mito => MITO_ENGINE_NAME,
1502 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1503 RegionAttribute::File => FILE_ENGINE_NAME,
1504 }
1505 }
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510
1511 use std::assert_matches::assert_matches;
1512
1513 use api::v1::SemanticType;
1514 use common_error::ext::ErrorExt;
1515 use datatypes::prelude::ConcreteDataType;
1516 use mito2::test_util::CreateRequestBuilder;
1517 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1518 use store_api::region_engine::RegionEngine;
1519 use store_api::region_request::{
1520 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1521 };
1522 use store_api::storage::RegionId;
1523
1524 use super::*;
1525 use crate::error::Result;
1526 use crate::tests::{MockRegionEngine, mock_region_server};
1527
1528 #[tokio::test]
1529 async fn test_region_registering() {
1530 common_telemetry::init_default_ut_logging();
1531
1532 let mut mock_region_server = mock_region_server();
1533 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1534 let engine_name = engine.name();
1535 mock_region_server.register_engine(engine.clone());
1536 let region_id = RegionId::new(1, 1);
1537 let builder = CreateRequestBuilder::new();
1538 let create_req = builder.build();
1539 mock_region_server.inner.region_map.insert(
1541 region_id,
1542 RegionEngineWithStatus::Registering(engine.clone()),
1543 );
1544 let response = mock_region_server
1545 .handle_request(region_id, RegionRequest::Create(create_req))
1546 .await
1547 .unwrap();
1548 assert_eq!(response.affected_rows, 0);
1549 let status = mock_region_server
1550 .inner
1551 .region_map
1552 .get(®ion_id)
1553 .unwrap()
1554 .clone();
1555 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1556
1557 mock_region_server.inner.region_map.insert(
1558 region_id,
1559 RegionEngineWithStatus::Registering(engine.clone()),
1560 );
1561 let response = mock_region_server
1562 .handle_request(
1563 region_id,
1564 RegionRequest::Open(RegionOpenRequest {
1565 engine: engine_name.to_string(),
1566 table_dir: String::new(),
1567 path_type: PathType::Bare,
1568 options: Default::default(),
1569 skip_wal_replay: false,
1570 checkpoint: None,
1571 }),
1572 )
1573 .await
1574 .unwrap();
1575 assert_eq!(response.affected_rows, 0);
1576 let status = mock_region_server
1577 .inner
1578 .region_map
1579 .get(®ion_id)
1580 .unwrap()
1581 .clone();
1582 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1583 }
1584
1585 #[tokio::test]
1586 async fn test_region_deregistering() {
1587 common_telemetry::init_default_ut_logging();
1588
1589 let mut mock_region_server = mock_region_server();
1590 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1591
1592 mock_region_server.register_engine(engine.clone());
1593
1594 let region_id = RegionId::new(1, 1);
1595
1596 mock_region_server.inner.region_map.insert(
1598 region_id,
1599 RegionEngineWithStatus::Deregistering(engine.clone()),
1600 );
1601
1602 let response = mock_region_server
1603 .handle_request(
1604 region_id,
1605 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1606 )
1607 .await
1608 .unwrap();
1609 assert_eq!(response.affected_rows, 0);
1610
1611 let status = mock_region_server
1612 .inner
1613 .region_map
1614 .get(®ion_id)
1615 .unwrap()
1616 .clone();
1617 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1618
1619 mock_region_server.inner.region_map.insert(
1620 region_id,
1621 RegionEngineWithStatus::Deregistering(engine.clone()),
1622 );
1623
1624 let response = mock_region_server
1625 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1626 .await
1627 .unwrap();
1628 assert_eq!(response.affected_rows, 0);
1629
1630 let status = mock_region_server
1631 .inner
1632 .region_map
1633 .get(®ion_id)
1634 .unwrap()
1635 .clone();
1636 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1637 }
1638
1639 #[tokio::test]
1640 async fn test_region_not_ready() {
1641 common_telemetry::init_default_ut_logging();
1642
1643 let mut mock_region_server = mock_region_server();
1644 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1645
1646 mock_region_server.register_engine(engine.clone());
1647
1648 let region_id = RegionId::new(1, 1);
1649
1650 mock_region_server.inner.region_map.insert(
1652 region_id,
1653 RegionEngineWithStatus::Registering(engine.clone()),
1654 );
1655
1656 let err = mock_region_server
1657 .handle_request(
1658 region_id,
1659 RegionRequest::Truncate(RegionTruncateRequest::All),
1660 )
1661 .await
1662 .unwrap_err();
1663
1664 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1665 }
1666
1667 #[tokio::test]
1668 async fn test_region_request_failed() {
1669 common_telemetry::init_default_ut_logging();
1670
1671 let mut mock_region_server = mock_region_server();
1672 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1673 MITO_ENGINE_NAME,
1674 Box::new(|_region_id, _request| {
1675 error::UnexpectedSnafu {
1676 violated: "test".to_string(),
1677 }
1678 .fail()
1679 }),
1680 );
1681
1682 mock_region_server.register_engine(engine.clone());
1683
1684 let region_id = RegionId::new(1, 1);
1685 let builder = CreateRequestBuilder::new();
1686 let create_req = builder.build();
1687 mock_region_server
1688 .handle_request(region_id, RegionRequest::Create(create_req))
1689 .await
1690 .unwrap_err();
1691
1692 let status = mock_region_server.inner.region_map.get(®ion_id);
1693 assert!(status.is_none());
1694
1695 mock_region_server
1696 .inner
1697 .region_map
1698 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1699
1700 mock_region_server
1701 .handle_request(
1702 region_id,
1703 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1704 )
1705 .await
1706 .unwrap_err();
1707
1708 let status = mock_region_server.inner.region_map.get(®ion_id);
1709 assert!(status.is_some());
1710 }
1711
1712 #[tokio::test]
1713 async fn test_batch_open_region_ignore_nonexistent_regions() {
1714 common_telemetry::init_default_ut_logging();
1715 let mut mock_region_server = mock_region_server();
1716 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1717 MITO_ENGINE_NAME,
1718 Box::new(|region_id, _request| {
1719 if region_id == RegionId::new(1, 1) {
1720 error::RegionNotFoundSnafu { region_id }.fail()
1721 } else {
1722 Ok(0)
1723 }
1724 }),
1725 );
1726 mock_region_server.register_engine(engine.clone());
1727
1728 let region_ids = mock_region_server
1729 .handle_batch_open_requests(
1730 8,
1731 vec![
1732 (
1733 RegionId::new(1, 1),
1734 RegionOpenRequest {
1735 engine: MITO_ENGINE_NAME.to_string(),
1736 table_dir: String::new(),
1737 path_type: PathType::Bare,
1738 options: Default::default(),
1739 skip_wal_replay: false,
1740 checkpoint: None,
1741 },
1742 ),
1743 (
1744 RegionId::new(1, 2),
1745 RegionOpenRequest {
1746 engine: MITO_ENGINE_NAME.to_string(),
1747 table_dir: String::new(),
1748 path_type: PathType::Bare,
1749 options: Default::default(),
1750 skip_wal_replay: false,
1751 checkpoint: None,
1752 },
1753 ),
1754 ],
1755 true,
1756 )
1757 .await
1758 .unwrap();
1759 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1760
1761 let err = mock_region_server
1762 .handle_batch_open_requests(
1763 8,
1764 vec![
1765 (
1766 RegionId::new(1, 1),
1767 RegionOpenRequest {
1768 engine: MITO_ENGINE_NAME.to_string(),
1769 table_dir: String::new(),
1770 path_type: PathType::Bare,
1771 options: Default::default(),
1772 skip_wal_replay: false,
1773 checkpoint: None,
1774 },
1775 ),
1776 (
1777 RegionId::new(1, 2),
1778 RegionOpenRequest {
1779 engine: MITO_ENGINE_NAME.to_string(),
1780 table_dir: String::new(),
1781 path_type: PathType::Bare,
1782 options: Default::default(),
1783 skip_wal_replay: false,
1784 checkpoint: None,
1785 },
1786 ),
1787 ],
1788 false,
1789 )
1790 .await
1791 .unwrap_err();
1792 assert_eq!(err.status_code(), StatusCode::Unexpected);
1793 }
1794
1795 struct CurrentEngineTest {
1796 region_id: RegionId,
1797 current_region_status: Option<RegionEngineWithStatus>,
1798 region_change: RegionChange,
1799 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1800 }
1801
1802 #[tokio::test]
1803 async fn test_current_engine() {
1804 common_telemetry::init_default_ut_logging();
1805
1806 let mut mock_region_server = mock_region_server();
1807 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1808 mock_region_server.register_engine(engine.clone());
1809
1810 let region_id = RegionId::new(1024, 1);
1811 let tests = vec![
1812 CurrentEngineTest {
1814 region_id,
1815 current_region_status: None,
1816 region_change: RegionChange::None,
1817 assert: Box::new(|result| {
1818 let err = result.unwrap_err();
1819 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1820 }),
1821 },
1822 CurrentEngineTest {
1823 region_id,
1824 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1825 region_change: RegionChange::None,
1826 assert: Box::new(|result| {
1827 let current_engine = result.unwrap();
1828 assert_matches!(current_engine, CurrentEngine::Engine(_));
1829 }),
1830 },
1831 CurrentEngineTest {
1832 region_id,
1833 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1834 region_change: RegionChange::None,
1835 assert: Box::new(|result| {
1836 let err = result.unwrap_err();
1837 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1838 }),
1839 },
1840 CurrentEngineTest {
1841 region_id,
1842 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1843 region_change: RegionChange::None,
1844 assert: Box::new(|result| {
1845 let err = result.unwrap_err();
1846 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1847 }),
1848 },
1849 CurrentEngineTest {
1851 region_id,
1852 current_region_status: None,
1853 region_change: RegionChange::Register(RegionAttribute::Mito),
1854 assert: Box::new(|result| {
1855 let current_engine = result.unwrap();
1856 assert_matches!(current_engine, CurrentEngine::Engine(_));
1857 }),
1858 },
1859 CurrentEngineTest {
1860 region_id,
1861 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1862 region_change: RegionChange::Register(RegionAttribute::Mito),
1863 assert: Box::new(|result| {
1864 let current_engine = result.unwrap();
1865 assert_matches!(current_engine, CurrentEngine::Engine(_));
1866 }),
1867 },
1868 CurrentEngineTest {
1869 region_id,
1870 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1871 region_change: RegionChange::Register(RegionAttribute::Mito),
1872 assert: Box::new(|result| {
1873 let err = result.unwrap_err();
1874 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1875 }),
1876 },
1877 CurrentEngineTest {
1878 region_id,
1879 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1880 region_change: RegionChange::Register(RegionAttribute::Mito),
1881 assert: Box::new(|result| {
1882 let current_engine = result.unwrap();
1883 assert_matches!(current_engine, CurrentEngine::Engine(_));
1884 }),
1885 },
1886 CurrentEngineTest {
1888 region_id,
1889 current_region_status: None,
1890 region_change: RegionChange::Deregisters,
1891 assert: Box::new(|result| {
1892 let current_engine = result.unwrap();
1893 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1894 }),
1895 },
1896 CurrentEngineTest {
1897 region_id,
1898 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1899 region_change: RegionChange::Deregisters,
1900 assert: Box::new(|result| {
1901 let err = result.unwrap_err();
1902 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1903 }),
1904 },
1905 CurrentEngineTest {
1906 region_id,
1907 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1908 region_change: RegionChange::Deregisters,
1909 assert: Box::new(|result| {
1910 let current_engine = result.unwrap();
1911 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1912 }),
1913 },
1914 CurrentEngineTest {
1915 region_id,
1916 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1917 region_change: RegionChange::Deregisters,
1918 assert: Box::new(|result| {
1919 let current_engine = result.unwrap();
1920 assert_matches!(current_engine, CurrentEngine::Engine(_));
1921 }),
1922 },
1923 ];
1924
1925 for test in tests {
1926 let CurrentEngineTest {
1927 region_id,
1928 current_region_status,
1929 region_change,
1930 assert,
1931 } = test;
1932
1933 if let Some(status) = current_region_status {
1935 mock_region_server
1936 .inner
1937 .region_map
1938 .insert(region_id, status);
1939 } else {
1940 mock_region_server.inner.region_map.remove(®ion_id);
1941 }
1942
1943 let result = mock_region_server
1944 .inner
1945 .get_engine(region_id, ®ion_change);
1946
1947 assert(result);
1948 }
1949 }
1950
1951 #[tokio::test]
1952 async fn test_region_server_parallelism() {
1953 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1954 let first_query = p.acquire().await;
1955 assert!(first_query.is_ok());
1956 let second_query = p.acquire().await;
1957 assert!(second_query.is_ok());
1958 let third_query = p.acquire().await;
1959 assert!(third_query.is_err());
1960 let err = third_query.unwrap_err();
1961 assert_eq!(
1962 err.output_msg(),
1963 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1964 );
1965 drop(first_query);
1966 let forth_query = p.acquire().await;
1967 assert!(forth_query.is_ok());
1968 }
1969
1970 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1971 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1972 metadata_builder.push_column_metadata(ColumnMetadata {
1973 column_schema: datatypes::schema::ColumnSchema::new(
1974 "timestamp",
1975 ConcreteDataType::timestamp_nanosecond_datatype(),
1976 false,
1977 ),
1978 semantic_type: SemanticType::Timestamp,
1979 column_id: 0,
1980 });
1981 metadata_builder.push_column_metadata(ColumnMetadata {
1982 column_schema: datatypes::schema::ColumnSchema::new(
1983 "file",
1984 ConcreteDataType::string_datatype(),
1985 true,
1986 ),
1987 semantic_type: SemanticType::Tag,
1988 column_id: 1,
1989 });
1990 metadata_builder.push_column_metadata(ColumnMetadata {
1991 column_schema: datatypes::schema::ColumnSchema::new(
1992 "message",
1993 ConcreteDataType::string_datatype(),
1994 true,
1995 ),
1996 semantic_type: SemanticType::Field,
1997 column_id: 2,
1998 });
1999 metadata_builder.primary_key(vec![1]);
2000 metadata_builder.build().unwrap()
2001 }
2002
2003 #[tokio::test]
2004 async fn test_handle_list_metadata_request() {
2005 common_telemetry::init_default_ut_logging();
2006
2007 let mut mock_region_server = mock_region_server();
2008 let region_id_1 = RegionId::new(1, 0);
2009 let region_id_2 = RegionId::new(2, 0);
2010
2011 let metadata_1 = mock_region_metadata(region_id_1);
2012 let metadata_2 = mock_region_metadata(region_id_2);
2013 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2014
2015 let metadata_1 = Arc::new(metadata_1);
2016 let metadata_2 = Arc::new(metadata_2);
2017 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2018 MITO_ENGINE_NAME,
2019 Box::new(move |region_id| {
2020 if region_id == region_id_1 {
2021 Ok(metadata_1.clone())
2022 } else if region_id == region_id_2 {
2023 Ok(metadata_2.clone())
2024 } else {
2025 error::RegionNotFoundSnafu { region_id }.fail()
2026 }
2027 }),
2028 );
2029
2030 mock_region_server.register_engine(engine.clone());
2031 mock_region_server
2032 .inner
2033 .region_map
2034 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2035 mock_region_server
2036 .inner
2037 .region_map
2038 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2039
2040 let list_metadata_request = ListMetadataRequest {
2042 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2043 };
2044 let response = mock_region_server
2045 .handle_list_metadata_request(&list_metadata_request)
2046 .await
2047 .unwrap();
2048 let decoded_metadata: Vec<Option<RegionMetadata>> =
2049 serde_json::from_slice(&response.metadata).unwrap();
2050 assert_eq!(metadatas, decoded_metadata);
2051 }
2052
2053 #[tokio::test]
2054 async fn test_handle_list_metadata_not_found() {
2055 common_telemetry::init_default_ut_logging();
2056
2057 let mut mock_region_server = mock_region_server();
2058 let region_id_1 = RegionId::new(1, 0);
2059 let region_id_2 = RegionId::new(2, 0);
2060
2061 let metadata_1 = mock_region_metadata(region_id_1);
2062 let metadatas = vec![Some(metadata_1.clone()), None];
2063
2064 let metadata_1 = Arc::new(metadata_1);
2065 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2066 MITO_ENGINE_NAME,
2067 Box::new(move |region_id| {
2068 if region_id == region_id_1 {
2069 Ok(metadata_1.clone())
2070 } else {
2071 error::RegionNotFoundSnafu { region_id }.fail()
2072 }
2073 }),
2074 );
2075
2076 mock_region_server.register_engine(engine.clone());
2077 mock_region_server
2078 .inner
2079 .region_map
2080 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2081
2082 let list_metadata_request = ListMetadataRequest {
2084 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2085 };
2086 let response = mock_region_server
2087 .handle_list_metadata_request(&list_metadata_request)
2088 .await
2089 .unwrap();
2090 let decoded_metadata: Vec<Option<RegionMetadata>> =
2091 serde_json::from_slice(&response.metadata).unwrap();
2092 assert_eq!(metadatas, decoded_metadata);
2093
2094 mock_region_server
2096 .inner
2097 .region_map
2098 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2099 let response = mock_region_server
2100 .handle_list_metadata_request(&list_metadata_request)
2101 .await
2102 .unwrap();
2103 let decoded_metadata: Vec<Option<RegionMetadata>> =
2104 serde_json::from_slice(&response.metadata).unwrap();
2105 assert_eq!(metadatas, decoded_metadata);
2106 }
2107
2108 #[tokio::test]
2109 async fn test_handle_list_metadata_failed() {
2110 common_telemetry::init_default_ut_logging();
2111
2112 let mut mock_region_server = mock_region_server();
2113 let region_id_1 = RegionId::new(1, 0);
2114
2115 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2116 MITO_ENGINE_NAME,
2117 Box::new(move |region_id| {
2118 error::UnexpectedSnafu {
2119 violated: format!("Failed to get region {region_id}"),
2120 }
2121 .fail()
2122 }),
2123 );
2124
2125 mock_region_server.register_engine(engine.clone());
2126 mock_region_server
2127 .inner
2128 .region_map
2129 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2130
2131 let list_metadata_request = ListMetadataRequest {
2133 region_ids: vec![region_id_1.as_u64()],
2134 };
2135 mock_region_server
2136 .handle_list_metadata_request(&list_metadata_request)
2137 .await
2138 .unwrap_err();
2139 }
2140}