1mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use futures_util::future::try_join_all;
48use metric_engine::engine::MetricEngine;
49use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
50use prost::Message;
51use query::QueryEngineRef;
52pub use query::dummy_catalog::{
53 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
54};
55use serde_json;
56use servers::error::{
57 self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
58};
59use servers::grpc::FlightCompression;
60use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
61use servers::grpc::region_server::RegionServerHandler;
62use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
63use snafu::{OptionExt, ResultExt, ensure};
64use store_api::metric_engine_consts::{
65 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
66};
67use store_api::region_engine::{
68 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
69 SettableRegionRoleState,
70};
71use store_api::region_request::{
72 AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
73 RegionOpenRequest, RegionRequest,
74};
75use store_api::storage::RegionId;
76use tokio::sync::{Semaphore, SemaphorePermit};
77use tokio::time::timeout;
78use tonic::{Request, Response, Result as TonicResult};
79
80use crate::error::{
81 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
82 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
83 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
84 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
85 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
86 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
87};
88use crate::event_listener::RegionServerEventListenerRef;
89use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
90
91#[derive(Clone)]
92pub struct RegionServer {
93 inner: Arc<RegionServerInner>,
94 flight_compression: FlightCompression,
95 suspend: Arc<AtomicBool>,
96}
97
98pub struct RegionStat {
99 pub region_id: RegionId,
100 pub engine: String,
101 pub role: RegionRole,
102}
103
104impl RegionServer {
105 pub fn new(
106 query_engine: QueryEngineRef,
107 runtime: Runtime,
108 event_listener: RegionServerEventListenerRef,
109 flight_compression: FlightCompression,
110 ) -> Self {
111 Self::with_table_provider(
112 query_engine,
113 runtime,
114 event_listener,
115 Arc::new(DummyTableProviderFactory),
116 0,
117 Duration::from_millis(0),
118 flight_compression,
119 )
120 }
121
122 pub fn with_table_provider(
123 query_engine: QueryEngineRef,
124 runtime: Runtime,
125 event_listener: RegionServerEventListenerRef,
126 table_provider_factory: TableProviderFactoryRef,
127 max_concurrent_queries: usize,
128 concurrent_query_limiter_timeout: Duration,
129 flight_compression: FlightCompression,
130 ) -> Self {
131 Self {
132 inner: Arc::new(RegionServerInner::new(
133 query_engine,
134 runtime,
135 event_listener,
136 table_provider_factory,
137 RegionServerParallelism::from_opts(
138 max_concurrent_queries,
139 concurrent_query_limiter_timeout,
140 ),
141 )),
142 flight_compression,
143 suspend: Arc::new(AtomicBool::new(false)),
144 }
145 }
146
147 pub fn register_engine(&mut self, engine: RegionEngineRef) {
149 self.inner.register_engine(engine);
150 }
151
152 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
154 self.inner.set_topic_stats_reporter(topic_stats_reporter);
155 }
156
157 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
159 match self.inner.get_engine(region_id, &RegionChange::None) {
160 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
161 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
162 Err(error::Error::RegionNotFound { .. }) => Ok(None),
163 Err(err) => Err(err),
164 }
165 }
166
167 pub fn mito_engine(&self) -> Option<MitoEngine> {
169 if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
170 Some(mito)
171 } else {
172 self.inner
173 .engines
174 .read()
175 .unwrap()
176 .get(MITO_ENGINE_NAME)
177 .cloned()
178 .and_then(|e| {
179 let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
180 if mito.is_none() {
181 warn!("Mito engine not found in region server engines");
182 }
183 mito
184 })
185 }
186 }
187
188 #[tracing::instrument(skip_all)]
189 pub async fn handle_batch_open_requests(
190 &self,
191 parallelism: usize,
192 requests: Vec<(RegionId, RegionOpenRequest)>,
193 ignore_nonexistent_region: bool,
194 ) -> Result<Vec<RegionId>> {
195 self.inner
196 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
197 .await
198 }
199
200 #[tracing::instrument(skip_all)]
201 pub async fn handle_batch_catchup_requests(
202 &self,
203 parallelism: usize,
204 requests: Vec<(RegionId, RegionCatchupRequest)>,
205 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
206 self.inner
207 .handle_batch_catchup_requests(parallelism, requests)
208 .await
209 }
210
211 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
212 pub async fn handle_request(
213 &self,
214 region_id: RegionId,
215 request: RegionRequest,
216 ) -> Result<RegionResponse> {
217 self.inner.handle_request(region_id, request).await
218 }
219
220 async fn table_provider(
222 &self,
223 region_id: RegionId,
224 ctx: Option<QueryContextRef>,
225 ) -> Result<Arc<dyn TableProvider>> {
226 let status = self
227 .inner
228 .region_map
229 .get(®ion_id)
230 .context(RegionNotFoundSnafu { region_id })?
231 .clone();
232 ensure!(
233 matches!(status, RegionEngineWithStatus::Ready(_)),
234 RegionNotReadySnafu { region_id }
235 );
236
237 self.inner
238 .table_provider_factory
239 .create(region_id, status.into_engine(), ctx)
240 .await
241 .context(ExecuteLogicalPlanSnafu)
242 }
243
244 pub async fn handle_remote_read(
246 &self,
247 request: api::v1::region::QueryRequest,
248 query_ctx: QueryContextRef,
249 ) -> Result<SendableRecordBatchStream> {
250 let _permit = if let Some(p) = &self.inner.parallelism {
251 Some(p.acquire().await?)
252 } else {
253 None
254 };
255
256 let region_id = RegionId::from_u64(request.region_id);
257 let catalog_list = Arc::new(NameAwareCatalogList::new(
258 self.clone(),
259 region_id,
260 query_ctx.clone(),
261 ));
262
263 if query_ctx.explain_verbose() {
264 common_telemetry::info!("Handle remote read for region: {}", region_id);
265 }
266
267 let decoder = self
268 .inner
269 .query_engine
270 .engine_context(query_ctx.clone())
271 .new_plan_decoder()
272 .context(NewPlanDecoderSnafu)?;
273
274 let plan = decoder
275 .decode(Bytes::from(request.plan), catalog_list, false)
276 .await
277 .context(DecodeLogicalPlanSnafu)?;
278
279 self.inner
280 .handle_read(
281 QueryRequest {
282 header: request.header,
283 region_id,
284 plan,
285 },
286 query_ctx,
287 )
288 .await
289 }
290
291 #[tracing::instrument(skip_all)]
292 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
293 let _permit = if let Some(p) = &self.inner.parallelism {
294 Some(p.acquire().await?)
295 } else {
296 None
297 };
298
299 let ctx = request.header.as_ref().map(|h| h.into());
300 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
301
302 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
303 .context(DataFusionSnafu)?;
304 let mut injector = injector_builder
305 .build(self, request.region_id, query_ctx.clone())
306 .await?;
307
308 let plan = request
309 .plan
310 .rewrite(&mut injector)
311 .context(DataFusionSnafu)?
312 .data;
313
314 self.inner
315 .handle_read(QueryRequest { plan, ..request }, query_ctx)
316 .await
317 }
318
319 pub fn reportable_regions(&self) -> Vec<RegionStat> {
323 self.inner
324 .region_map
325 .iter()
326 .filter_map(|e| {
327 let region_id = *e.key();
328 e.role(region_id).map(|role| RegionStat {
330 region_id,
331 engine: e.value().name().to_string(),
332 role,
333 })
334 })
335 .collect()
336 }
337
338 pub fn topic_stats(&self) -> Vec<TopicStat> {
340 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
341 let Some(reporter) = reporter.as_mut() else {
342 return vec![];
343 };
344 reporter
345 .reportable_topics()
346 .into_iter()
347 .map(|stat| TopicStat {
348 topic_name: stat.topic,
349 record_size: stat.record_size,
350 record_num: stat.record_num,
351 latest_entry_id: stat.latest_entry_id,
352 })
353 .collect()
354 }
355
356 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
357 self.inner.region_map.get(®ion_id).and_then(|engine| {
358 engine.role(region_id).map(|role| match role {
359 RegionRole::Follower => false,
360 RegionRole::Leader => true,
361 RegionRole::DowngradingLeader => true,
362 })
363 })
364 }
365
366 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
367 let engine = self
368 .inner
369 .region_map
370 .get(®ion_id)
371 .with_context(|| RegionNotFoundSnafu { region_id })?;
372 engine
373 .set_region_role(region_id, role)
374 .with_context(|_| HandleRegionRequestSnafu { region_id })
375 }
376
377 pub async fn set_region_role_state_gracefully(
387 &self,
388 region_id: RegionId,
389 state: SettableRegionRoleState,
390 ) -> Result<SetRegionRoleStateResponse> {
391 match self.inner.region_map.get(®ion_id) {
392 Some(engine) => Ok(engine
393 .set_region_role_state_gracefully(region_id, state)
394 .await
395 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
396 None => Ok(SetRegionRoleStateResponse::NotFound),
397 }
398 }
399
400 pub fn runtime(&self) -> Runtime {
401 self.inner.runtime.clone()
402 }
403
404 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
405 match self.inner.region_map.get(®ion_id) {
406 Some(e) => e.region_statistic(region_id),
407 None => None,
408 }
409 }
410
411 pub async fn stop(&self) -> Result<()> {
413 self.inner.stop().await
414 }
415
416 #[cfg(test)]
417 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
419 {
420 let mut engines = self.inner.engines.write().unwrap();
421 if !engines.contains_key(engine.name()) {
422 debug!("Registering test engine: {}", engine.name());
423 engines.insert(engine.name().to_string(), engine.clone());
424 }
425 }
426
427 self.inner
428 .region_map
429 .insert(region_id, RegionEngineWithStatus::Ready(engine));
430 }
431
432 async fn handle_batch_ddl_requests(
433 &self,
434 request: region_request::Body,
435 ) -> Result<RegionResponse> {
436 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
438 .context(BuildRegionRequestsSnafu)?
439 .unwrap();
440 let tracing_context = TracingContext::from_current_span();
441
442 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
443 self.inner
444 .handle_batch_request(batch_request)
445 .trace(span)
446 .await
447 }
448
449 async fn handle_requests_in_parallel(
450 &self,
451 request: region_request::Body,
452 ) -> Result<RegionResponse> {
453 let requests =
454 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
455 let tracing_context = TracingContext::from_current_span();
456
457 let join_tasks = requests.into_iter().map(|(region_id, req)| {
458 let self_to_move = self;
459 let span = tracing_context.attach(info_span!(
460 "RegionServer::handle_region_request",
461 region_id = region_id.to_string()
462 ));
463 async move {
464 self_to_move
465 .handle_request(region_id, req)
466 .trace(span)
467 .await
468 }
469 });
470
471 let results = try_join_all(join_tasks).await?;
472 let mut affected_rows = 0;
473 let mut extensions = HashMap::new();
474 for result in results {
475 affected_rows += result.affected_rows;
476 extensions.extend(result.extensions);
477 }
478
479 Ok(RegionResponse {
480 affected_rows,
481 extensions,
482 metadata: Vec::new(),
483 })
484 }
485
486 async fn handle_requests_in_serial(
487 &self,
488 request: region_request::Body,
489 ) -> Result<RegionResponse> {
490 let requests =
491 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
492 let tracing_context = TracingContext::from_current_span();
493
494 let mut affected_rows = 0;
495 let mut extensions = HashMap::new();
496 for (region_id, req) in requests {
499 let span = tracing_context.attach(info_span!(
500 "RegionServer::handle_region_request",
501 region_id = region_id.to_string()
502 ));
503 let result = self.handle_request(region_id, req).trace(span).await?;
504
505 affected_rows += result.affected_rows;
506 extensions.extend(result.extensions);
507 }
508
509 Ok(RegionResponse {
510 affected_rows,
511 extensions,
512 metadata: Vec::new(),
513 })
514 }
515
516 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
517 let region_id = RegionId::from_u64(request.region_id);
518 let manifest_info = request
519 .manifest_info
520 .context(error::MissingRequiredFieldSnafu {
521 name: "manifest_info",
522 })?;
523
524 let manifest_info = match manifest_info {
525 ManifestInfo::MitoManifestInfo(info) => {
526 RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
527 }
528 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
529 info.data_manifest_version,
530 0,
531 info.metadata_manifest_version,
532 0,
533 ),
534 };
535
536 let tracing_context = TracingContext::from_current_span();
537 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
538
539 self.sync_region(region_id, manifest_info)
540 .trace(span)
541 .await
542 .map(|_| RegionResponse::new(AffectedRows::default()))
543 }
544
545 #[tracing::instrument(skip_all)]
550 async fn handle_list_metadata_request(
551 &self,
552 request: &ListMetadataRequest,
553 ) -> Result<RegionResponse> {
554 let mut region_metadatas = Vec::new();
555 for region_id in &request.region_ids {
557 let region_id = RegionId::from_u64(*region_id);
558 let Some(engine) = self.find_engine(region_id)? else {
560 region_metadatas.push(None);
561 continue;
562 };
563
564 match engine.get_metadata(region_id).await {
565 Ok(metadata) => region_metadatas.push(Some(metadata)),
566 Err(err) => {
567 if err.status_code() == StatusCode::RegionNotFound {
568 region_metadatas.push(None);
569 } else {
570 Err(err).with_context(|_| GetRegionMetadataSnafu {
571 engine: engine.name(),
572 region_id,
573 })?;
574 }
575 }
576 }
577 }
578
579 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
581
582 let response = RegionResponse::from_metadata(json_result);
583
584 Ok(response)
585 }
586
587 pub async fn sync_region(
589 &self,
590 region_id: RegionId,
591 manifest_info: RegionManifestInfo,
592 ) -> Result<()> {
593 let engine_with_status = self
594 .inner
595 .region_map
596 .get(®ion_id)
597 .with_context(|| RegionNotFoundSnafu { region_id })?;
598
599 self.inner
600 .handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
601 .await
602 }
603
604 fn is_suspended(&self) -> bool {
605 self.suspend.load(Ordering::Relaxed)
606 }
607
608 pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
609 self.suspend.clone()
610 }
611}
612
613#[async_trait]
614impl RegionServerHandler for RegionServer {
615 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
616 let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
617 .with_label_values(&[request.as_ref()]);
618 let response = match &request {
619 region_request::Body::Creates(_)
620 | region_request::Body::Drops(_)
621 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
622 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
623 self.handle_requests_in_parallel(request).await
624 }
625 region_request::Body::Sync(sync_request) => {
626 self.handle_sync_region_request(sync_request).await
627 }
628 region_request::Body::ListMetadata(list_metadata_request) => {
629 self.handle_list_metadata_request(list_metadata_request)
630 .await
631 }
632 _ => self.handle_requests_in_serial(request).await,
633 }
634 .map_err(BoxedError::new)
635 .inspect_err(|_| {
636 failed_requests_cnt.inc();
637 })
638 .context(ExecuteGrpcRequestSnafu)?;
639
640 Ok(RegionResponseV1 {
641 header: Some(ResponseHeader {
642 status: Some(Status {
643 status_code: StatusCode::Success as _,
644 ..Default::default()
645 }),
646 }),
647 affected_rows: response.affected_rows as _,
648 extensions: response.extensions,
649 metadata: response.metadata,
650 })
651 }
652}
653
654#[async_trait]
655impl FlightCraft for RegionServer {
656 async fn do_get(
657 &self,
658 request: Request<Ticket>,
659 ) -> TonicResult<Response<TonicStream<FlightData>>> {
660 ensure!(!self.is_suspended(), SuspendedSnafu);
661
662 let ticket = request.into_inner().ticket;
663 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
664 .context(servers_error::InvalidFlightTicketSnafu)?;
665 let tracing_context = request
666 .header
667 .as_ref()
668 .map(|h| TracingContext::from_w3c(&h.tracing_context))
669 .unwrap_or_default();
670 let query_ctx = request
671 .header
672 .as_ref()
673 .map(|h| Arc::new(QueryContext::from(h)))
674 .unwrap_or(QueryContext::arc());
675
676 let result = self
677 .handle_remote_read(request, query_ctx.clone())
678 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
679 .await?;
680
681 let stream = Box::pin(FlightRecordBatchStream::new(
682 result,
683 tracing_context,
684 self.flight_compression,
685 query_ctx,
686 ));
687 Ok(Response::new(stream))
688 }
689}
690
691#[derive(Clone)]
692enum RegionEngineWithStatus {
693 Registering(RegionEngineRef),
695 Deregistering(RegionEngineRef),
697 Ready(RegionEngineRef),
699}
700
701impl RegionEngineWithStatus {
702 pub fn into_engine(self) -> RegionEngineRef {
704 match self {
705 RegionEngineWithStatus::Registering(engine) => engine,
706 RegionEngineWithStatus::Deregistering(engine) => engine,
707 RegionEngineWithStatus::Ready(engine) => engine,
708 }
709 }
710
711 pub fn engine(&self) -> &RegionEngineRef {
713 match self {
714 RegionEngineWithStatus::Registering(engine) => engine,
715 RegionEngineWithStatus::Deregistering(engine) => engine,
716 RegionEngineWithStatus::Ready(engine) => engine,
717 }
718 }
719}
720
721impl Deref for RegionEngineWithStatus {
722 type Target = RegionEngineRef;
723
724 fn deref(&self) -> &Self::Target {
725 match self {
726 RegionEngineWithStatus::Registering(engine) => engine,
727 RegionEngineWithStatus::Deregistering(engine) => engine,
728 RegionEngineWithStatus::Ready(engine) => engine,
729 }
730 }
731}
732
733struct RegionServerInner {
734 engines: RwLock<HashMap<String, RegionEngineRef>>,
735 region_map: DashMap<RegionId, RegionEngineWithStatus>,
736 query_engine: QueryEngineRef,
737 runtime: Runtime,
738 event_listener: RegionServerEventListenerRef,
739 table_provider_factory: TableProviderFactoryRef,
740 parallelism: Option<RegionServerParallelism>,
743 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
745 mito_engine: RwLock<Option<MitoEngine>>,
749}
750
751struct RegionServerParallelism {
752 semaphore: Semaphore,
753 timeout: Duration,
754}
755
756impl RegionServerParallelism {
757 pub fn from_opts(
758 max_concurrent_queries: usize,
759 concurrent_query_limiter_timeout: Duration,
760 ) -> Option<Self> {
761 if max_concurrent_queries == 0 {
762 return None;
763 }
764 Some(RegionServerParallelism {
765 semaphore: Semaphore::new(max_concurrent_queries),
766 timeout: concurrent_query_limiter_timeout,
767 })
768 }
769
770 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
771 timeout(self.timeout, self.semaphore.acquire())
772 .await
773 .context(ConcurrentQueryLimiterTimeoutSnafu)?
774 .context(ConcurrentQueryLimiterClosedSnafu)
775 }
776}
777
778enum CurrentEngine {
779 Engine(RegionEngineRef),
780 EarlyReturn(AffectedRows),
781}
782
783impl Debug for CurrentEngine {
784 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
785 match self {
786 CurrentEngine::Engine(engine) => f
787 .debug_struct("CurrentEngine")
788 .field("engine", &engine.name())
789 .finish(),
790 CurrentEngine::EarlyReturn(rows) => f
791 .debug_struct("CurrentEngine")
792 .field("return", rows)
793 .finish(),
794 }
795 }
796}
797
798impl RegionServerInner {
799 pub fn new(
800 query_engine: QueryEngineRef,
801 runtime: Runtime,
802 event_listener: RegionServerEventListenerRef,
803 table_provider_factory: TableProviderFactoryRef,
804 parallelism: Option<RegionServerParallelism>,
805 ) -> Self {
806 Self {
807 engines: RwLock::new(HashMap::new()),
808 region_map: DashMap::new(),
809 query_engine,
810 runtime,
811 event_listener,
812 table_provider_factory,
813 parallelism,
814 topic_stats_reporter: RwLock::new(None),
815 mito_engine: RwLock::new(None),
816 }
817 }
818
819 pub fn register_engine(&self, engine: RegionEngineRef) {
820 let engine_name = engine.name();
821 if engine_name == MITO_ENGINE_NAME
822 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
823 {
824 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
825 }
826
827 info!("Region Engine {engine_name} is registered");
828 self.engines
829 .write()
830 .unwrap()
831 .insert(engine_name.to_string(), engine);
832 }
833
834 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
835 info!("Set topic stats reporter");
836 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
837 }
838
839 fn get_engine(
840 &self,
841 region_id: RegionId,
842 region_change: &RegionChange,
843 ) -> Result<CurrentEngine> {
844 let current_region_status = self.region_map.get(®ion_id);
845
846 let engine = match region_change {
847 RegionChange::Register(attribute) => match current_region_status {
848 Some(status) => match status.clone() {
849 RegionEngineWithStatus::Registering(engine) => engine,
850 RegionEngineWithStatus::Deregistering(_) => {
851 return error::RegionBusySnafu { region_id }.fail();
852 }
853 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
854 },
855 _ => self
856 .engines
857 .read()
858 .unwrap()
859 .get(attribute.engine())
860 .with_context(|| RegionEngineNotFoundSnafu {
861 name: attribute.engine(),
862 })?
863 .clone(),
864 },
865 RegionChange::Deregisters => match current_region_status {
866 Some(status) => match status.clone() {
867 RegionEngineWithStatus::Registering(_) => {
868 return error::RegionBusySnafu { region_id }.fail();
869 }
870 RegionEngineWithStatus::Deregistering(_) => {
871 return Ok(CurrentEngine::EarlyReturn(0));
872 }
873 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
874 },
875 None => return Ok(CurrentEngine::EarlyReturn(0)),
876 },
877 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
878 match current_region_status {
879 Some(status) => match status.clone() {
880 RegionEngineWithStatus::Registering(_) => {
881 return error::RegionNotReadySnafu { region_id }.fail();
882 }
883 RegionEngineWithStatus::Deregistering(_) => {
884 return error::RegionNotFoundSnafu { region_id }.fail();
885 }
886 RegionEngineWithStatus::Ready(engine) => engine,
887 },
888 None => return error::RegionNotFoundSnafu { region_id }.fail(),
889 }
890 }
891 };
892
893 Ok(CurrentEngine::Engine(engine))
894 }
895
896 async fn handle_batch_open_requests_inner(
897 &self,
898 engine: RegionEngineRef,
899 parallelism: usize,
900 requests: Vec<(RegionId, RegionOpenRequest)>,
901 ignore_nonexistent_region: bool,
902 ) -> Result<Vec<RegionId>> {
903 let region_changes = requests
904 .iter()
905 .map(|(region_id, open)| {
906 let attribute = parse_region_attribute(&open.engine, &open.options)?;
907 Ok((*region_id, RegionChange::Register(attribute)))
908 })
909 .collect::<Result<HashMap<_, _>>>()?;
910
911 for (®ion_id, region_change) in ®ion_changes {
912 self.set_region_status_not_ready(region_id, &engine, region_change)
913 }
914
915 let mut open_regions = Vec::with_capacity(requests.len());
916 let mut errors = vec![];
917 match engine
918 .handle_batch_open_requests(parallelism, requests)
919 .await
920 .with_context(|_| HandleBatchOpenRequestSnafu)
921 {
922 Ok(results) => {
923 for (region_id, result) in results {
924 let region_change = ®ion_changes[®ion_id];
925 match result {
926 Ok(_) => {
927 if let Err(e) = self
928 .set_region_status_ready(region_id, engine.clone(), *region_change)
929 .await
930 {
931 error!(e; "Failed to set region to ready: {}", region_id);
932 errors.push(BoxedError::new(e));
933 } else {
934 open_regions.push(region_id)
935 }
936 }
937 Err(e) => {
938 self.unset_region_status(region_id, &engine, *region_change);
939 if e.status_code() == StatusCode::RegionNotFound
940 && ignore_nonexistent_region
941 {
942 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
943 } else {
944 error!(e; "Failed to open region: {}", region_id);
945 errors.push(e);
946 }
947 }
948 }
949 }
950 }
951 Err(e) => {
952 for (®ion_id, region_change) in ®ion_changes {
953 self.unset_region_status(region_id, &engine, *region_change);
954 }
955 error!(e; "Failed to open batch regions");
956 errors.push(BoxedError::new(e));
957 }
958 }
959
960 if !errors.is_empty() {
961 return error::UnexpectedSnafu {
962 violated: format!("Failed to open batch regions: {:?}", errors[0]),
964 }
965 .fail();
966 }
967
968 Ok(open_regions)
969 }
970
971 pub async fn handle_batch_open_requests(
972 &self,
973 parallelism: usize,
974 requests: Vec<(RegionId, RegionOpenRequest)>,
975 ignore_nonexistent_region: bool,
976 ) -> Result<Vec<RegionId>> {
977 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
978 HashMap::with_capacity(requests.len());
979 for (region_id, request) in requests {
980 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
981 requests.push((region_id, request));
982 } else {
983 engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
984 }
985 }
986
987 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
988 for (engine, requests) in engine_grouped_requests {
989 let engine = self
990 .engines
991 .read()
992 .unwrap()
993 .get(&engine)
994 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
995 .clone();
996 results.push(
997 self.handle_batch_open_requests_inner(
998 engine,
999 parallelism,
1000 requests,
1001 ignore_nonexistent_region,
1002 )
1003 .await,
1004 )
1005 }
1006
1007 Ok(results
1008 .into_iter()
1009 .collect::<Result<Vec<_>>>()?
1010 .into_iter()
1011 .flatten()
1012 .collect::<Vec<_>>())
1013 }
1014
1015 pub async fn handle_batch_catchup_requests_inner(
1016 &self,
1017 engine: RegionEngineRef,
1018 parallelism: usize,
1019 requests: Vec<(RegionId, RegionCatchupRequest)>,
1020 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1021 for (region_id, _) in &requests {
1022 self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1023 }
1024 let region_ids = requests
1025 .iter()
1026 .map(|(region_id, _)| *region_id)
1027 .collect::<Vec<_>>();
1028 let mut responses = Vec::with_capacity(requests.len());
1029 match engine
1030 .handle_batch_catchup_requests(parallelism, requests)
1031 .await
1032 {
1033 Ok(results) => {
1034 for (region_id, result) in results {
1035 match result {
1036 Ok(_) => {
1037 if let Err(e) = self
1038 .set_region_status_ready(
1039 region_id,
1040 engine.clone(),
1041 RegionChange::Catchup,
1042 )
1043 .await
1044 {
1045 error!(e; "Failed to set region to ready: {}", region_id);
1046 responses.push((region_id, Err(BoxedError::new(e))));
1047 } else {
1048 responses.push((region_id, Ok(())));
1049 }
1050 }
1051 Err(e) => {
1052 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1053 error!(e; "Failed to catchup region: {}", region_id);
1054 responses.push((region_id, Err(e)));
1055 }
1056 }
1057 }
1058 }
1059 Err(e) => {
1060 for region_id in region_ids {
1061 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1062 }
1063 error!(e; "Failed to catchup batch regions");
1064 return error::UnexpectedSnafu {
1065 violated: format!("Failed to catchup batch regions: {:?}", e),
1066 }
1067 .fail();
1068 }
1069 }
1070
1071 Ok(responses)
1072 }
1073
1074 pub async fn handle_batch_catchup_requests(
1075 &self,
1076 parallelism: usize,
1077 requests: Vec<(RegionId, RegionCatchupRequest)>,
1078 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1079 let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1080
1081 let mut responses = Vec::with_capacity(requests.len());
1082 for (region_id, request) in requests {
1083 if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1084 match engine {
1085 CurrentEngine::Engine(engine) => {
1086 engine_grouped_requests
1087 .entry(engine.name().to_string())
1088 .or_default()
1089 .push((region_id, request));
1090 }
1091 CurrentEngine::EarlyReturn(_) => {
1092 return error::UnexpectedSnafu {
1093 violated: format!("Unexpected engine type for region {}", region_id),
1094 }
1095 .fail();
1096 }
1097 }
1098 } else {
1099 responses.push((
1100 region_id,
1101 Err(BoxedError::new(
1102 error::RegionNotFoundSnafu { region_id }.build(),
1103 )),
1104 ));
1105 }
1106 }
1107
1108 for (engine, requests) in engine_grouped_requests {
1109 let engine = self
1110 .engines
1111 .read()
1112 .unwrap()
1113 .get(&engine)
1114 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1115 .clone();
1116 responses.extend(
1117 self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1118 .await?,
1119 );
1120 }
1121
1122 Ok(responses)
1123 }
1124
1125 pub async fn handle_batch_request(
1129 &self,
1130 batch_request: BatchRegionDdlRequest,
1131 ) -> Result<RegionResponse> {
1132 let region_changes = match &batch_request {
1133 BatchRegionDdlRequest::Create(requests) => requests
1134 .iter()
1135 .map(|(region_id, create)| {
1136 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1137 Ok((*region_id, RegionChange::Register(attribute)))
1138 })
1139 .collect::<Result<Vec<_>>>()?,
1140 BatchRegionDdlRequest::Drop(requests) => requests
1141 .iter()
1142 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1143 .collect::<Vec<_>>(),
1144 BatchRegionDdlRequest::Alter(requests) => requests
1145 .iter()
1146 .map(|(region_id, _)| (*region_id, RegionChange::None))
1147 .collect::<Vec<_>>(),
1148 };
1149
1150 let (first_region_id, first_region_change) = region_changes.first().unwrap();
1153 let engine = match self.get_engine(*first_region_id, first_region_change)? {
1154 CurrentEngine::Engine(engine) => engine,
1155 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1156 };
1157
1158 for (region_id, region_change) in region_changes.iter() {
1159 self.set_region_status_not_ready(*region_id, &engine, region_change);
1160 }
1161
1162 let ddl_type = batch_request.request_type();
1163 let result = engine
1164 .handle_batch_ddl_requests(batch_request)
1165 .await
1166 .context(HandleBatchDdlRequestSnafu { ddl_type });
1167
1168 match result {
1169 Ok(result) => {
1170 for (region_id, region_change) in ®ion_changes {
1171 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1172 .await?;
1173 }
1174
1175 Ok(RegionResponse {
1176 affected_rows: result.affected_rows,
1177 extensions: result.extensions,
1178 metadata: Vec::new(),
1179 })
1180 }
1181 Err(err) => {
1182 for (region_id, region_change) in region_changes {
1183 self.unset_region_status(region_id, &engine, region_change);
1184 }
1185
1186 Err(err)
1187 }
1188 }
1189 }
1190
1191 pub async fn handle_request(
1192 &self,
1193 region_id: RegionId,
1194 request: RegionRequest,
1195 ) -> Result<RegionResponse> {
1196 let request_type = request.request_type();
1197 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1198 .with_label_values(&[request_type])
1199 .start_timer();
1200
1201 let region_change = match &request {
1202 RegionRequest::Create(create) => {
1203 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1204 RegionChange::Register(attribute)
1205 }
1206 RegionRequest::Open(open) => {
1207 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1208 RegionChange::Register(attribute)
1209 }
1210 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1211 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1212 RegionChange::Ingest
1213 }
1214 RegionRequest::Alter(_)
1215 | RegionRequest::Flush(_)
1216 | RegionRequest::Compact(_)
1217 | RegionRequest::Truncate(_)
1218 | RegionRequest::BuildIndex(_)
1219 | RegionRequest::EnterStaging(_) => RegionChange::None,
1220 RegionRequest::Catchup(_) => RegionChange::Catchup,
1221 };
1222
1223 let engine = match self.get_engine(region_id, ®ion_change)? {
1224 CurrentEngine::Engine(engine) => engine,
1225 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1226 };
1227
1228 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1230
1231 match engine
1232 .handle_request(region_id, request)
1233 .await
1234 .with_context(|_| HandleRegionRequestSnafu { region_id })
1235 {
1236 Ok(result) => {
1237 if matches!(region_change, RegionChange::Ingest) {
1239 crate::metrics::REGION_CHANGED_ROW_COUNT
1240 .with_label_values(&[request_type])
1241 .inc_by(result.affected_rows as u64);
1242 }
1243 self.set_region_status_ready(region_id, engine.clone(), region_change)
1245 .await?;
1246
1247 Ok(RegionResponse {
1248 affected_rows: result.affected_rows,
1249 extensions: result.extensions,
1250 metadata: Vec::new(),
1251 })
1252 }
1253 Err(err) => {
1254 if matches!(region_change, RegionChange::Ingest) {
1255 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1256 .with_label_values(&[request_type])
1257 .inc();
1258 }
1259 self.unset_region_status(region_id, &engine, region_change);
1261 Err(err)
1262 }
1263 }
1264 }
1265
1266 pub async fn handle_sync_region(
1268 &self,
1269 engine: &RegionEngineRef,
1270 region_id: RegionId,
1271 manifest_info: RegionManifestInfo,
1272 ) -> Result<()> {
1273 let Some(new_opened_regions) = engine
1274 .sync_region(region_id, manifest_info)
1275 .await
1276 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1277 .new_opened_logical_region_ids()
1278 else {
1279 return Ok(());
1280 };
1281
1282 for region in new_opened_regions {
1283 self.region_map
1284 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1285 info!("Logical region {} is registered!", region);
1286 }
1287
1288 Ok(())
1289 }
1290
1291 fn set_region_status_not_ready(
1292 &self,
1293 region_id: RegionId,
1294 engine: &RegionEngineRef,
1295 region_change: &RegionChange,
1296 ) {
1297 match region_change {
1298 RegionChange::Register(_) => {
1299 self.region_map.insert(
1300 region_id,
1301 RegionEngineWithStatus::Registering(engine.clone()),
1302 );
1303 }
1304 RegionChange::Deregisters => {
1305 self.region_map.insert(
1306 region_id,
1307 RegionEngineWithStatus::Deregistering(engine.clone()),
1308 );
1309 }
1310 _ => {}
1311 }
1312 }
1313
1314 fn unset_region_status(
1315 &self,
1316 region_id: RegionId,
1317 engine: &RegionEngineRef,
1318 region_change: RegionChange,
1319 ) {
1320 match region_change {
1321 RegionChange::None | RegionChange::Ingest => {}
1322 RegionChange::Register(_) => {
1323 self.region_map.remove(®ion_id);
1324 }
1325 RegionChange::Deregisters => {
1326 self.region_map
1327 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1328 }
1329 RegionChange::Catchup => {}
1330 }
1331 }
1332
1333 async fn set_region_status_ready(
1334 &self,
1335 region_id: RegionId,
1336 engine: RegionEngineRef,
1337 region_change: RegionChange,
1338 ) -> Result<()> {
1339 let engine_type = engine.name();
1340 match region_change {
1341 RegionChange::None | RegionChange::Ingest => {}
1342 RegionChange::Register(attribute) => {
1343 info!(
1344 "Region {region_id} is registered to engine {}",
1345 attribute.engine()
1346 );
1347 self.region_map
1348 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1349
1350 match attribute {
1351 RegionAttribute::Metric { physical } => {
1352 if physical {
1353 self.register_logical_regions(&engine, region_id).await?;
1355 self.event_listener.on_region_registered(region_id);
1357 }
1358 }
1359 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1360 RegionAttribute::File => {
1361 }
1363 }
1364 }
1365 RegionChange::Deregisters => {
1366 info!("Region {region_id} is deregistered from engine {engine_type}");
1367 self.region_map
1368 .remove(®ion_id)
1369 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1370 self.event_listener.on_region_deregistered(region_id);
1371 }
1372 RegionChange::Catchup => {
1373 if is_metric_engine(engine.name()) {
1374 self.register_logical_regions(&engine, region_id).await?;
1376 }
1377 }
1378 }
1379 Ok(())
1380 }
1381
1382 async fn register_logical_regions(
1383 &self,
1384 engine: &RegionEngineRef,
1385 physical_region_id: RegionId,
1386 ) -> Result<()> {
1387 let metric_engine =
1388 engine
1389 .as_any()
1390 .downcast_ref::<MetricEngine>()
1391 .context(UnexpectedSnafu {
1392 violated: format!(
1393 "expecting engine type '{}', actual '{}'",
1394 METRIC_ENGINE_NAME,
1395 engine.name(),
1396 ),
1397 })?;
1398
1399 let logical_regions = metric_engine
1400 .logical_regions(physical_region_id)
1401 .await
1402 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1403
1404 for region in logical_regions {
1405 self.region_map
1406 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1407 info!("Logical region {} is registered!", region);
1408 }
1409 Ok(())
1410 }
1411
1412 pub async fn handle_read(
1413 &self,
1414 request: QueryRequest,
1415 query_ctx: QueryContextRef,
1416 ) -> Result<SendableRecordBatchStream> {
1417 let result = self
1420 .query_engine
1421 .execute(request.plan, query_ctx)
1422 .await
1423 .context(ExecuteLogicalPlanSnafu)?;
1424
1425 match result.data {
1426 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1427 UnsupportedOutputSnafu { expected: "stream" }.fail()
1428 }
1429 OutputData::Stream(stream) => Ok(stream),
1430 }
1431 }
1432
1433 async fn stop(&self) -> Result<()> {
1434 let regions = self
1442 .region_map
1443 .iter()
1444 .map(|x| (*x.key(), x.value().clone()))
1445 .collect::<Vec<_>>();
1446 let num_regions = regions.len();
1447
1448 for (region_id, engine) in regions {
1449 let closed = engine
1450 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1451 .await;
1452 match closed {
1453 Ok(_) => debug!("Region {region_id} is closed"),
1454 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1455 }
1456 }
1457 self.region_map.clear();
1458 info!("closed {num_regions} regions");
1459
1460 drop(self.mito_engine.write().unwrap().take());
1461 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1462 for (engine_name, engine) in engines {
1463 engine
1464 .stop()
1465 .await
1466 .context(StopRegionEngineSnafu { name: &engine_name })?;
1467 info!("Region engine {engine_name} is stopped");
1468 }
1469
1470 Ok(())
1471 }
1472}
1473
1474#[derive(Debug, Clone, Copy)]
1475enum RegionChange {
1476 None,
1477 Register(RegionAttribute),
1478 Deregisters,
1479 Catchup,
1480 Ingest,
1481}
1482
1483fn is_metric_engine(engine: &str) -> bool {
1484 engine == METRIC_ENGINE_NAME
1485}
1486
1487fn parse_region_attribute(
1488 engine: &str,
1489 options: &HashMap<String, String>,
1490) -> Result<RegionAttribute> {
1491 match engine {
1492 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1493 METRIC_ENGINE_NAME => {
1494 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1495
1496 Ok(RegionAttribute::Metric { physical })
1497 }
1498 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1499 _ => error::UnexpectedSnafu {
1500 violated: format!("Unknown engine: {}", engine),
1501 }
1502 .fail(),
1503 }
1504}
1505
1506#[derive(Debug, Clone, Copy)]
1507enum RegionAttribute {
1508 Mito,
1509 Metric { physical: bool },
1510 File,
1511}
1512
1513impl RegionAttribute {
1514 fn engine(&self) -> &'static str {
1515 match self {
1516 RegionAttribute::Mito => MITO_ENGINE_NAME,
1517 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1518 RegionAttribute::File => FILE_ENGINE_NAME,
1519 }
1520 }
1521}
1522
1523#[cfg(test)]
1524mod tests {
1525
1526 use std::assert_matches::assert_matches;
1527
1528 use api::v1::SemanticType;
1529 use common_error::ext::ErrorExt;
1530 use datatypes::prelude::ConcreteDataType;
1531 use mito2::test_util::CreateRequestBuilder;
1532 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1533 use store_api::region_engine::RegionEngine;
1534 use store_api::region_request::{
1535 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1536 };
1537 use store_api::storage::RegionId;
1538
1539 use super::*;
1540 use crate::error::Result;
1541 use crate::tests::{MockRegionEngine, mock_region_server};
1542
1543 #[tokio::test]
1544 async fn test_region_registering() {
1545 common_telemetry::init_default_ut_logging();
1546
1547 let mut mock_region_server = mock_region_server();
1548 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1549 let engine_name = engine.name();
1550 mock_region_server.register_engine(engine.clone());
1551 let region_id = RegionId::new(1, 1);
1552 let builder = CreateRequestBuilder::new();
1553 let create_req = builder.build();
1554 mock_region_server.inner.region_map.insert(
1556 region_id,
1557 RegionEngineWithStatus::Registering(engine.clone()),
1558 );
1559 let response = mock_region_server
1560 .handle_request(region_id, RegionRequest::Create(create_req))
1561 .await
1562 .unwrap();
1563 assert_eq!(response.affected_rows, 0);
1564 let status = mock_region_server
1565 .inner
1566 .region_map
1567 .get(®ion_id)
1568 .unwrap()
1569 .clone();
1570 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1571
1572 mock_region_server.inner.region_map.insert(
1573 region_id,
1574 RegionEngineWithStatus::Registering(engine.clone()),
1575 );
1576 let response = mock_region_server
1577 .handle_request(
1578 region_id,
1579 RegionRequest::Open(RegionOpenRequest {
1580 engine: engine_name.to_string(),
1581 table_dir: String::new(),
1582 path_type: PathType::Bare,
1583 options: Default::default(),
1584 skip_wal_replay: false,
1585 checkpoint: None,
1586 }),
1587 )
1588 .await
1589 .unwrap();
1590 assert_eq!(response.affected_rows, 0);
1591 let status = mock_region_server
1592 .inner
1593 .region_map
1594 .get(®ion_id)
1595 .unwrap()
1596 .clone();
1597 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1598 }
1599
1600 #[tokio::test]
1601 async fn test_region_deregistering() {
1602 common_telemetry::init_default_ut_logging();
1603
1604 let mut mock_region_server = mock_region_server();
1605 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1606
1607 mock_region_server.register_engine(engine.clone());
1608
1609 let region_id = RegionId::new(1, 1);
1610
1611 mock_region_server.inner.region_map.insert(
1613 region_id,
1614 RegionEngineWithStatus::Deregistering(engine.clone()),
1615 );
1616
1617 let response = mock_region_server
1618 .handle_request(
1619 region_id,
1620 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1621 )
1622 .await
1623 .unwrap();
1624 assert_eq!(response.affected_rows, 0);
1625
1626 let status = mock_region_server
1627 .inner
1628 .region_map
1629 .get(®ion_id)
1630 .unwrap()
1631 .clone();
1632 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1633
1634 mock_region_server.inner.region_map.insert(
1635 region_id,
1636 RegionEngineWithStatus::Deregistering(engine.clone()),
1637 );
1638
1639 let response = mock_region_server
1640 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1641 .await
1642 .unwrap();
1643 assert_eq!(response.affected_rows, 0);
1644
1645 let status = mock_region_server
1646 .inner
1647 .region_map
1648 .get(®ion_id)
1649 .unwrap()
1650 .clone();
1651 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1652 }
1653
1654 #[tokio::test]
1655 async fn test_region_not_ready() {
1656 common_telemetry::init_default_ut_logging();
1657
1658 let mut mock_region_server = mock_region_server();
1659 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1660
1661 mock_region_server.register_engine(engine.clone());
1662
1663 let region_id = RegionId::new(1, 1);
1664
1665 mock_region_server.inner.region_map.insert(
1667 region_id,
1668 RegionEngineWithStatus::Registering(engine.clone()),
1669 );
1670
1671 let err = mock_region_server
1672 .handle_request(
1673 region_id,
1674 RegionRequest::Truncate(RegionTruncateRequest::All),
1675 )
1676 .await
1677 .unwrap_err();
1678
1679 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1680 }
1681
1682 #[tokio::test]
1683 async fn test_region_request_failed() {
1684 common_telemetry::init_default_ut_logging();
1685
1686 let mut mock_region_server = mock_region_server();
1687 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1688 MITO_ENGINE_NAME,
1689 Box::new(|_region_id, _request| {
1690 error::UnexpectedSnafu {
1691 violated: "test".to_string(),
1692 }
1693 .fail()
1694 }),
1695 );
1696
1697 mock_region_server.register_engine(engine.clone());
1698
1699 let region_id = RegionId::new(1, 1);
1700 let builder = CreateRequestBuilder::new();
1701 let create_req = builder.build();
1702 mock_region_server
1703 .handle_request(region_id, RegionRequest::Create(create_req))
1704 .await
1705 .unwrap_err();
1706
1707 let status = mock_region_server.inner.region_map.get(®ion_id);
1708 assert!(status.is_none());
1709
1710 mock_region_server
1711 .inner
1712 .region_map
1713 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1714
1715 mock_region_server
1716 .handle_request(
1717 region_id,
1718 RegionRequest::Drop(RegionDropRequest { fast_path: false }),
1719 )
1720 .await
1721 .unwrap_err();
1722
1723 let status = mock_region_server.inner.region_map.get(®ion_id);
1724 assert!(status.is_some());
1725 }
1726
1727 #[tokio::test]
1728 async fn test_batch_open_region_ignore_nonexistent_regions() {
1729 common_telemetry::init_default_ut_logging();
1730 let mut mock_region_server = mock_region_server();
1731 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1732 MITO_ENGINE_NAME,
1733 Box::new(|region_id, _request| {
1734 if region_id == RegionId::new(1, 1) {
1735 error::RegionNotFoundSnafu { region_id }.fail()
1736 } else {
1737 Ok(0)
1738 }
1739 }),
1740 );
1741 mock_region_server.register_engine(engine.clone());
1742
1743 let region_ids = mock_region_server
1744 .handle_batch_open_requests(
1745 8,
1746 vec![
1747 (
1748 RegionId::new(1, 1),
1749 RegionOpenRequest {
1750 engine: MITO_ENGINE_NAME.to_string(),
1751 table_dir: String::new(),
1752 path_type: PathType::Bare,
1753 options: Default::default(),
1754 skip_wal_replay: false,
1755 checkpoint: None,
1756 },
1757 ),
1758 (
1759 RegionId::new(1, 2),
1760 RegionOpenRequest {
1761 engine: MITO_ENGINE_NAME.to_string(),
1762 table_dir: String::new(),
1763 path_type: PathType::Bare,
1764 options: Default::default(),
1765 skip_wal_replay: false,
1766 checkpoint: None,
1767 },
1768 ),
1769 ],
1770 true,
1771 )
1772 .await
1773 .unwrap();
1774 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1775
1776 let err = mock_region_server
1777 .handle_batch_open_requests(
1778 8,
1779 vec![
1780 (
1781 RegionId::new(1, 1),
1782 RegionOpenRequest {
1783 engine: MITO_ENGINE_NAME.to_string(),
1784 table_dir: String::new(),
1785 path_type: PathType::Bare,
1786 options: Default::default(),
1787 skip_wal_replay: false,
1788 checkpoint: None,
1789 },
1790 ),
1791 (
1792 RegionId::new(1, 2),
1793 RegionOpenRequest {
1794 engine: MITO_ENGINE_NAME.to_string(),
1795 table_dir: String::new(),
1796 path_type: PathType::Bare,
1797 options: Default::default(),
1798 skip_wal_replay: false,
1799 checkpoint: None,
1800 },
1801 ),
1802 ],
1803 false,
1804 )
1805 .await
1806 .unwrap_err();
1807 assert_eq!(err.status_code(), StatusCode::Unexpected);
1808 }
1809
1810 struct CurrentEngineTest {
1811 region_id: RegionId,
1812 current_region_status: Option<RegionEngineWithStatus>,
1813 region_change: RegionChange,
1814 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1815 }
1816
1817 #[tokio::test]
1818 async fn test_current_engine() {
1819 common_telemetry::init_default_ut_logging();
1820
1821 let mut mock_region_server = mock_region_server();
1822 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1823 mock_region_server.register_engine(engine.clone());
1824
1825 let region_id = RegionId::new(1024, 1);
1826 let tests = vec![
1827 CurrentEngineTest {
1829 region_id,
1830 current_region_status: None,
1831 region_change: RegionChange::None,
1832 assert: Box::new(|result| {
1833 let err = result.unwrap_err();
1834 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1835 }),
1836 },
1837 CurrentEngineTest {
1838 region_id,
1839 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1840 region_change: RegionChange::None,
1841 assert: Box::new(|result| {
1842 let current_engine = result.unwrap();
1843 assert_matches!(current_engine, CurrentEngine::Engine(_));
1844 }),
1845 },
1846 CurrentEngineTest {
1847 region_id,
1848 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1849 region_change: RegionChange::None,
1850 assert: Box::new(|result| {
1851 let err = result.unwrap_err();
1852 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1853 }),
1854 },
1855 CurrentEngineTest {
1856 region_id,
1857 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1858 region_change: RegionChange::None,
1859 assert: Box::new(|result| {
1860 let err = result.unwrap_err();
1861 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1862 }),
1863 },
1864 CurrentEngineTest {
1866 region_id,
1867 current_region_status: None,
1868 region_change: RegionChange::Register(RegionAttribute::Mito),
1869 assert: Box::new(|result| {
1870 let current_engine = result.unwrap();
1871 assert_matches!(current_engine, CurrentEngine::Engine(_));
1872 }),
1873 },
1874 CurrentEngineTest {
1875 region_id,
1876 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1877 region_change: RegionChange::Register(RegionAttribute::Mito),
1878 assert: Box::new(|result| {
1879 let current_engine = result.unwrap();
1880 assert_matches!(current_engine, CurrentEngine::Engine(_));
1881 }),
1882 },
1883 CurrentEngineTest {
1884 region_id,
1885 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1886 region_change: RegionChange::Register(RegionAttribute::Mito),
1887 assert: Box::new(|result| {
1888 let err = result.unwrap_err();
1889 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1890 }),
1891 },
1892 CurrentEngineTest {
1893 region_id,
1894 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1895 region_change: RegionChange::Register(RegionAttribute::Mito),
1896 assert: Box::new(|result| {
1897 let current_engine = result.unwrap();
1898 assert_matches!(current_engine, CurrentEngine::Engine(_));
1899 }),
1900 },
1901 CurrentEngineTest {
1903 region_id,
1904 current_region_status: None,
1905 region_change: RegionChange::Deregisters,
1906 assert: Box::new(|result| {
1907 let current_engine = result.unwrap();
1908 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1909 }),
1910 },
1911 CurrentEngineTest {
1912 region_id,
1913 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1914 region_change: RegionChange::Deregisters,
1915 assert: Box::new(|result| {
1916 let err = result.unwrap_err();
1917 assert_eq!(err.status_code(), StatusCode::RegionBusy);
1918 }),
1919 },
1920 CurrentEngineTest {
1921 region_id,
1922 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1923 region_change: RegionChange::Deregisters,
1924 assert: Box::new(|result| {
1925 let current_engine = result.unwrap();
1926 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
1927 }),
1928 },
1929 CurrentEngineTest {
1930 region_id,
1931 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1932 region_change: RegionChange::Deregisters,
1933 assert: Box::new(|result| {
1934 let current_engine = result.unwrap();
1935 assert_matches!(current_engine, CurrentEngine::Engine(_));
1936 }),
1937 },
1938 ];
1939
1940 for test in tests {
1941 let CurrentEngineTest {
1942 region_id,
1943 current_region_status,
1944 region_change,
1945 assert,
1946 } = test;
1947
1948 if let Some(status) = current_region_status {
1950 mock_region_server
1951 .inner
1952 .region_map
1953 .insert(region_id, status);
1954 } else {
1955 mock_region_server.inner.region_map.remove(®ion_id);
1956 }
1957
1958 let result = mock_region_server
1959 .inner
1960 .get_engine(region_id, ®ion_change);
1961
1962 assert(result);
1963 }
1964 }
1965
1966 #[tokio::test]
1967 async fn test_region_server_parallelism() {
1968 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
1969 let first_query = p.acquire().await;
1970 assert!(first_query.is_ok());
1971 let second_query = p.acquire().await;
1972 assert!(second_query.is_ok());
1973 let third_query = p.acquire().await;
1974 assert!(third_query.is_err());
1975 let err = third_query.unwrap_err();
1976 assert_eq!(
1977 err.output_msg(),
1978 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
1979 );
1980 drop(first_query);
1981 let forth_query = p.acquire().await;
1982 assert!(forth_query.is_ok());
1983 }
1984
1985 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
1986 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
1987 metadata_builder.push_column_metadata(ColumnMetadata {
1988 column_schema: datatypes::schema::ColumnSchema::new(
1989 "timestamp",
1990 ConcreteDataType::timestamp_nanosecond_datatype(),
1991 false,
1992 ),
1993 semantic_type: SemanticType::Timestamp,
1994 column_id: 0,
1995 });
1996 metadata_builder.push_column_metadata(ColumnMetadata {
1997 column_schema: datatypes::schema::ColumnSchema::new(
1998 "file",
1999 ConcreteDataType::string_datatype(),
2000 true,
2001 ),
2002 semantic_type: SemanticType::Tag,
2003 column_id: 1,
2004 });
2005 metadata_builder.push_column_metadata(ColumnMetadata {
2006 column_schema: datatypes::schema::ColumnSchema::new(
2007 "message",
2008 ConcreteDataType::string_datatype(),
2009 true,
2010 ),
2011 semantic_type: SemanticType::Field,
2012 column_id: 2,
2013 });
2014 metadata_builder.primary_key(vec![1]);
2015 metadata_builder.build().unwrap()
2016 }
2017
2018 #[tokio::test]
2019 async fn test_handle_list_metadata_request() {
2020 common_telemetry::init_default_ut_logging();
2021
2022 let mut mock_region_server = mock_region_server();
2023 let region_id_1 = RegionId::new(1, 0);
2024 let region_id_2 = RegionId::new(2, 0);
2025
2026 let metadata_1 = mock_region_metadata(region_id_1);
2027 let metadata_2 = mock_region_metadata(region_id_2);
2028 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2029
2030 let metadata_1 = Arc::new(metadata_1);
2031 let metadata_2 = Arc::new(metadata_2);
2032 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2033 MITO_ENGINE_NAME,
2034 Box::new(move |region_id| {
2035 if region_id == region_id_1 {
2036 Ok(metadata_1.clone())
2037 } else if region_id == region_id_2 {
2038 Ok(metadata_2.clone())
2039 } else {
2040 error::RegionNotFoundSnafu { region_id }.fail()
2041 }
2042 }),
2043 );
2044
2045 mock_region_server.register_engine(engine.clone());
2046 mock_region_server
2047 .inner
2048 .region_map
2049 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2050 mock_region_server
2051 .inner
2052 .region_map
2053 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2054
2055 let list_metadata_request = ListMetadataRequest {
2057 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2058 };
2059 let response = mock_region_server
2060 .handle_list_metadata_request(&list_metadata_request)
2061 .await
2062 .unwrap();
2063 let decoded_metadata: Vec<Option<RegionMetadata>> =
2064 serde_json::from_slice(&response.metadata).unwrap();
2065 assert_eq!(metadatas, decoded_metadata);
2066 }
2067
2068 #[tokio::test]
2069 async fn test_handle_list_metadata_not_found() {
2070 common_telemetry::init_default_ut_logging();
2071
2072 let mut mock_region_server = mock_region_server();
2073 let region_id_1 = RegionId::new(1, 0);
2074 let region_id_2 = RegionId::new(2, 0);
2075
2076 let metadata_1 = mock_region_metadata(region_id_1);
2077 let metadatas = vec![Some(metadata_1.clone()), None];
2078
2079 let metadata_1 = Arc::new(metadata_1);
2080 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2081 MITO_ENGINE_NAME,
2082 Box::new(move |region_id| {
2083 if region_id == region_id_1 {
2084 Ok(metadata_1.clone())
2085 } else {
2086 error::RegionNotFoundSnafu { region_id }.fail()
2087 }
2088 }),
2089 );
2090
2091 mock_region_server.register_engine(engine.clone());
2092 mock_region_server
2093 .inner
2094 .region_map
2095 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2096
2097 let list_metadata_request = ListMetadataRequest {
2099 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2100 };
2101 let response = mock_region_server
2102 .handle_list_metadata_request(&list_metadata_request)
2103 .await
2104 .unwrap();
2105 let decoded_metadata: Vec<Option<RegionMetadata>> =
2106 serde_json::from_slice(&response.metadata).unwrap();
2107 assert_eq!(metadatas, decoded_metadata);
2108
2109 mock_region_server
2111 .inner
2112 .region_map
2113 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2114 let response = mock_region_server
2115 .handle_list_metadata_request(&list_metadata_request)
2116 .await
2117 .unwrap();
2118 let decoded_metadata: Vec<Option<RegionMetadata>> =
2119 serde_json::from_slice(&response.metadata).unwrap();
2120 assert_eq!(metadatas, decoded_metadata);
2121 }
2122
2123 #[tokio::test]
2124 async fn test_handle_list_metadata_failed() {
2125 common_telemetry::init_default_ut_logging();
2126
2127 let mut mock_region_server = mock_region_server();
2128 let region_id_1 = RegionId::new(1, 0);
2129
2130 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2131 MITO_ENGINE_NAME,
2132 Box::new(move |region_id| {
2133 error::UnexpectedSnafu {
2134 violated: format!("Failed to get region {region_id}"),
2135 }
2136 .fail()
2137 }),
2138 );
2139
2140 mock_region_server.register_engine(engine.clone());
2141 mock_region_server
2142 .inner
2143 .region_map
2144 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2145
2146 let list_metadata_request = ListMetadataRequest {
2148 region_ids: vec![region_id_1.as_u64()],
2149 };
2150 mock_region_server
2151 .handle_list_metadata_request(&list_metadata_request)
2152 .await
2153 .unwrap_err();
2154 }
2155}