1mod catalog;
16
17use std::collections::HashMap;
18use std::fmt::Debug;
19use std::ops::Deref;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use api::region::RegionResponse;
25use api::v1::meta::TopicStat;
26use api::v1::region::sync_request::ManifestInfo;
27use api::v1::region::{
28 ListMetadataRequest, RegionResponse as RegionResponseV1, SyncRequest, region_request,
29};
30use api::v1::{ResponseHeader, Status};
31use arrow_flight::{FlightData, Ticket};
32use async_trait::async_trait;
33use bytes::Bytes;
34use common_error::ext::{BoxedError, ErrorExt};
35use common_error::status_code::StatusCode;
36use common_meta::datanode::TopicStatsReporter;
37use common_query::OutputData;
38use common_query::request::QueryRequest;
39use common_recordbatch::SendableRecordBatchStream;
40use common_runtime::Runtime;
41use common_telemetry::tracing::{self, info_span};
42use common_telemetry::tracing_context::{FutureExt, TracingContext};
43use common_telemetry::{debug, error, info, warn};
44use dashmap::DashMap;
45use datafusion::datasource::TableProvider;
46use datafusion_common::tree_node::TreeNode;
47use either::Either;
48use futures_util::future::try_join_all;
49use metric_engine::engine::MetricEngine;
50use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
51use prost::Message;
52use query::QueryEngineRef;
53pub use query::dummy_catalog::{
54 DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
55};
56use serde_json;
57use servers::error::{
58 self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
59};
60use servers::grpc::FlightCompression;
61use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
62use servers::grpc::region_server::RegionServerHandler;
63use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
64use snafu::{OptionExt, ResultExt, ensure};
65use store_api::metric_engine_consts::{
66 FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
67};
68use store_api::region_engine::{
69 RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
70 RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
71 SyncRegionFromRequest,
72};
73use store_api::region_request::{
74 AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
75 RegionOpenRequest, RegionRequest,
76};
77use store_api::storage::RegionId;
78use tokio::sync::{Semaphore, SemaphorePermit};
79use tokio::time::timeout;
80use tonic::{Request, Response, Result as TonicResult};
81
82use crate::error::{
83 self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
84 ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
85 ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, GetRegionMetadataSnafu,
86 HandleBatchDdlRequestSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
87 NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu,
88 Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
89};
90use crate::event_listener::RegionServerEventListenerRef;
91use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
92
93#[derive(Clone)]
94pub struct RegionServer {
95 inner: Arc<RegionServerInner>,
96 flight_compression: FlightCompression,
97 suspend: Arc<AtomicBool>,
98}
99
100pub struct RegionStat {
101 pub region_id: RegionId,
102 pub engine: String,
103 pub role: RegionRole,
104}
105
106impl RegionServer {
107 pub fn new(
108 query_engine: QueryEngineRef,
109 runtime: Runtime,
110 event_listener: RegionServerEventListenerRef,
111 flight_compression: FlightCompression,
112 ) -> Self {
113 Self::with_table_provider(
114 query_engine,
115 runtime,
116 event_listener,
117 Arc::new(DummyTableProviderFactory),
118 0,
119 Duration::from_millis(0),
120 flight_compression,
121 )
122 }
123
124 pub fn with_table_provider(
125 query_engine: QueryEngineRef,
126 runtime: Runtime,
127 event_listener: RegionServerEventListenerRef,
128 table_provider_factory: TableProviderFactoryRef,
129 max_concurrent_queries: usize,
130 concurrent_query_limiter_timeout: Duration,
131 flight_compression: FlightCompression,
132 ) -> Self {
133 Self {
134 inner: Arc::new(RegionServerInner::new(
135 query_engine,
136 runtime,
137 event_listener,
138 table_provider_factory,
139 RegionServerParallelism::from_opts(
140 max_concurrent_queries,
141 concurrent_query_limiter_timeout,
142 ),
143 )),
144 flight_compression,
145 suspend: Arc::new(AtomicBool::new(false)),
146 }
147 }
148
149 pub fn register_engine(&mut self, engine: RegionEngineRef) {
151 self.inner.register_engine(engine);
152 }
153
154 pub fn set_topic_stats_reporter(&mut self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
156 self.inner.set_topic_stats_reporter(topic_stats_reporter);
157 }
158
159 pub fn find_engine(&self, region_id: RegionId) -> Result<Option<RegionEngineRef>> {
161 match self.inner.get_engine(region_id, &RegionChange::None) {
162 Ok(CurrentEngine::Engine(engine)) => Ok(Some(engine)),
163 Ok(CurrentEngine::EarlyReturn(_)) => Ok(None),
164 Err(error::Error::RegionNotFound { .. }) => Ok(None),
165 Err(err) => Err(err),
166 }
167 }
168
169 pub fn mito_engine(&self) -> Option<MitoEngine> {
171 if let Some(mito) = self.inner.mito_engine.read().unwrap().clone() {
172 Some(mito)
173 } else {
174 self.inner
175 .engines
176 .read()
177 .unwrap()
178 .get(MITO_ENGINE_NAME)
179 .cloned()
180 .and_then(|e| {
181 let mito = e.as_any().downcast_ref::<MitoEngine>().cloned();
182 if mito.is_none() {
183 warn!("Mito engine not found in region server engines");
184 }
185 mito
186 })
187 }
188 }
189
190 #[tracing::instrument(skip_all)]
191 pub async fn handle_batch_open_requests(
192 &self,
193 parallelism: usize,
194 requests: Vec<(RegionId, RegionOpenRequest)>,
195 ignore_nonexistent_region: bool,
196 ) -> Result<Vec<RegionId>> {
197 self.inner
198 .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
199 .await
200 }
201
202 #[tracing::instrument(skip_all)]
203 pub async fn handle_batch_catchup_requests(
204 &self,
205 parallelism: usize,
206 requests: Vec<(RegionId, RegionCatchupRequest)>,
207 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
208 self.inner
209 .handle_batch_catchup_requests(parallelism, requests)
210 .await
211 }
212
213 #[tracing::instrument(skip_all, fields(request_type = request.request_type()))]
214 pub async fn handle_request(
215 &self,
216 region_id: RegionId,
217 request: RegionRequest,
218 ) -> Result<RegionResponse> {
219 self.inner.handle_request(region_id, request).await
220 }
221
222 async fn table_provider(
224 &self,
225 region_id: RegionId,
226 ctx: Option<QueryContextRef>,
227 ) -> Result<Arc<dyn TableProvider>> {
228 let status = self
229 .inner
230 .region_map
231 .get(®ion_id)
232 .context(RegionNotFoundSnafu { region_id })?
233 .clone();
234 ensure!(
235 matches!(status, RegionEngineWithStatus::Ready(_)),
236 RegionNotReadySnafu { region_id }
237 );
238
239 self.inner
240 .table_provider_factory
241 .create(region_id, status.into_engine(), ctx)
242 .await
243 .context(ExecuteLogicalPlanSnafu)
244 }
245
246 pub async fn handle_remote_read(
248 &self,
249 request: api::v1::region::QueryRequest,
250 query_ctx: QueryContextRef,
251 ) -> Result<SendableRecordBatchStream> {
252 let _permit = if let Some(p) = &self.inner.parallelism {
253 Some(p.acquire().await?)
254 } else {
255 None
256 };
257
258 let region_id = RegionId::from_u64(request.region_id);
259 let catalog_list = Arc::new(NameAwareCatalogList::new(
260 self.clone(),
261 region_id,
262 query_ctx.clone(),
263 ));
264
265 if query_ctx.explain_verbose() {
266 common_telemetry::info!("Handle remote read for region: {}", region_id);
267 }
268
269 let decoder = self
270 .inner
271 .query_engine
272 .engine_context(query_ctx.clone())
273 .new_plan_decoder()
274 .context(NewPlanDecoderSnafu)?;
275
276 let plan = decoder
277 .decode(Bytes::from(request.plan), catalog_list, false)
278 .await
279 .context(DecodeLogicalPlanSnafu)?;
280
281 self.inner
282 .handle_read(
283 QueryRequest {
284 header: request.header,
285 region_id,
286 plan,
287 },
288 query_ctx,
289 )
290 .await
291 }
292
293 #[tracing::instrument(skip_all)]
294 pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
295 let _permit = if let Some(p) = &self.inner.parallelism {
296 Some(p.acquire().await?)
297 } else {
298 None
299 };
300
301 let ctx = request.header.as_ref().map(|h| h.into());
302 let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
303
304 let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
305 .context(DataFusionSnafu)?;
306 let mut injector = injector_builder
307 .build(self, request.region_id, query_ctx.clone())
308 .await?;
309
310 let plan = request
311 .plan
312 .rewrite(&mut injector)
313 .context(DataFusionSnafu)?
314 .data;
315
316 self.inner
317 .handle_read(QueryRequest { plan, ..request }, query_ctx)
318 .await
319 }
320
321 pub fn reportable_regions(&self) -> Vec<RegionStat> {
325 self.inner
326 .region_map
327 .iter()
328 .filter_map(|e| {
329 let region_id = *e.key();
330 e.role(region_id).map(|role| RegionStat {
332 region_id,
333 engine: e.value().name().to_string(),
334 role,
335 })
336 })
337 .collect()
338 }
339
340 pub fn topic_stats(&self) -> Vec<TopicStat> {
342 let mut reporter = self.inner.topic_stats_reporter.write().unwrap();
343 let Some(reporter) = reporter.as_mut() else {
344 return vec![];
345 };
346 reporter
347 .reportable_topics()
348 .into_iter()
349 .map(|stat| TopicStat {
350 topic_name: stat.topic,
351 record_size: stat.record_size,
352 record_num: stat.record_num,
353 latest_entry_id: stat.latest_entry_id,
354 })
355 .collect()
356 }
357
358 pub fn is_region_leader(&self, region_id: RegionId) -> Option<bool> {
359 self.inner.region_map.get(®ion_id).and_then(|engine| {
360 engine.role(region_id).map(|role| match role {
361 RegionRole::Follower => false,
362 RegionRole::Leader => true,
363 RegionRole::DowngradingLeader => true,
364 })
365 })
366 }
367
368 pub fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
369 let engine = self
370 .inner
371 .region_map
372 .get(®ion_id)
373 .with_context(|| RegionNotFoundSnafu { region_id })?;
374 engine
375 .set_region_role(region_id, role)
376 .with_context(|_| HandleRegionRequestSnafu { region_id })
377 }
378
379 pub async fn set_region_role_state_gracefully(
389 &self,
390 region_id: RegionId,
391 state: SettableRegionRoleState,
392 ) -> Result<SetRegionRoleStateResponse> {
393 match self.inner.region_map.get(®ion_id) {
394 Some(engine) => Ok(engine
395 .set_region_role_state_gracefully(region_id, state)
396 .await
397 .with_context(|_| HandleRegionRequestSnafu { region_id })?),
398 None => Ok(SetRegionRoleStateResponse::NotFound),
399 }
400 }
401
402 pub fn runtime(&self) -> Runtime {
403 self.inner.runtime.clone()
404 }
405
406 pub fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
407 match self.inner.region_map.get(®ion_id) {
408 Some(e) => e.region_statistic(region_id),
409 None => None,
410 }
411 }
412
413 pub async fn stop(&self) -> Result<()> {
415 self.inner.stop().await
416 }
417
418 #[cfg(test)]
419 pub(crate) fn register_test_region(&self, region_id: RegionId, engine: RegionEngineRef) {
421 {
422 let mut engines = self.inner.engines.write().unwrap();
423 if !engines.contains_key(engine.name()) {
424 debug!("Registering test engine: {}", engine.name());
425 engines.insert(engine.name().to_string(), engine.clone());
426 }
427 }
428
429 self.inner
430 .region_map
431 .insert(region_id, RegionEngineWithStatus::Ready(engine));
432 }
433
434 async fn handle_batch_ddl_requests(
435 &self,
436 request: region_request::Body,
437 ) -> Result<RegionResponse> {
438 let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
440 .context(BuildRegionRequestsSnafu)?
441 .unwrap();
442 let tracing_context = TracingContext::from_current_span();
443
444 let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
445 self.inner
446 .handle_batch_request(batch_request)
447 .trace(span)
448 .await
449 }
450
451 async fn handle_requests_in_parallel(
452 &self,
453 request: region_request::Body,
454 ) -> Result<RegionResponse> {
455 let requests =
456 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
457
458 match self.try_handle_metric_batch_puts(requests).await? {
461 Either::Left(response) => Ok(response),
462 Either::Right(requests) => {
463 let tracing_context = TracingContext::from_current_span();
465 let join_tasks =
466 requests
467 .into_iter()
468 .map(|(region_id, req): (RegionId, RegionRequest)| {
469 let self_to_move = self;
470 let span = tracing_context.attach(info_span!(
471 "RegionServer::handle_region_request",
472 region_id = region_id.to_string()
473 ));
474 async move {
475 self_to_move
476 .handle_request(region_id, req)
477 .trace(span)
478 .await
479 }
480 });
481
482 let results = try_join_all(join_tasks).await?;
483 let mut affected_rows = 0;
484 let mut extensions = HashMap::new();
485 for result in results {
486 affected_rows += result.affected_rows;
487 extensions.extend(result.extensions);
488 }
489
490 Ok(RegionResponse {
491 affected_rows,
492 extensions,
493 metadata: Vec::new(),
494 })
495 }
496 }
497 }
498
499 async fn handle_requests_in_serial(
500 &self,
501 request: region_request::Body,
502 ) -> Result<RegionResponse> {
503 let requests =
504 RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
505 let tracing_context = TracingContext::from_current_span();
506
507 let mut affected_rows = 0;
508 let mut extensions = HashMap::new();
509 for (region_id, req) in requests {
510 let span = tracing_context.attach(info_span!(
511 "RegionServer::handle_region_request",
512 region_id = region_id.to_string()
513 ));
514 let result = self.handle_request(region_id, req).trace(span).await?;
515
516 affected_rows += result.affected_rows;
517 extensions.extend(result.extensions);
518 }
519
520 Ok(RegionResponse {
521 affected_rows,
522 extensions,
523 metadata: Vec::new(),
524 })
525 }
526
527 async fn try_handle_metric_batch_puts(
534 &self,
535 requests: Vec<(RegionId, RegionRequest)>,
536 ) -> Result<Either<RegionResponse, Vec<(RegionId, RegionRequest)>>> {
537 if requests.is_empty() {
538 return Ok(Either::Right(requests));
539 }
540
541 if !matches!(requests[0].1, RegionRequest::Put(_)) {
543 return Ok(Either::Right(requests));
544 }
545 let first_region_id = requests[0].0;
546 let request_type = requests[0].1.request_type();
547
548 let engine = match self
552 .inner
553 .get_engine(first_region_id, &RegionChange::None)?
554 {
555 CurrentEngine::Engine(e) => e,
556 _ => return Ok(Either::Right(requests)),
557 };
558
559 if engine.name() != METRIC_ENGINE_NAME {
560 return Ok(Either::Right(requests));
561 }
562
563 let mut all_puts = true;
565 for (_, req) in &requests {
566 if !matches!(req, RegionRequest::Put(_)) {
567 all_puts = false;
568 break;
569 }
570 }
571
572 if !all_puts {
573 return Ok(Either::Right(requests));
574 }
575
576 let put_requests = requests.into_iter().map(|(region_id, req)| {
578 if let RegionRequest::Put(put) = req {
579 (region_id, put)
580 } else {
581 unreachable!("Already checked all are Put")
582 }
583 });
584
585 let metric_engine =
587 engine
588 .as_any()
589 .downcast_ref::<MetricEngine>()
590 .context(UnexpectedSnafu {
591 violated: "Failed to downcast to MetricEngine",
592 })?;
593
594 let tracing_context = TracingContext::from_current_span();
595 let batch_size = put_requests.len();
596 let span = tracing_context.attach(info_span!(
597 "RegionServer::handle_metric_batch_puts",
598 batch_size = batch_size,
599 ));
600 let result = metric_engine
601 .put_regions_batch(put_requests)
602 .trace(span)
603 .await
604 .map_err(BoxedError::new)
605 .context(HandleRegionRequestSnafu {
606 region_id: first_region_id,
607 });
608
609 match result {
610 Ok(total_affected) => {
611 crate::metrics::REGION_CHANGED_ROW_COUNT
612 .with_label_values(&[request_type])
613 .inc_by(total_affected as u64);
614 Ok(Either::Left(RegionResponse::new(total_affected)))
615 }
616 Err(err) => {
617 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
618 .with_label_values(&[request_type])
619 .inc_by(batch_size as u64);
620 Err(err)
621 }
622 }
623 }
624
625 async fn handle_sync_region_request(&self, request: &SyncRequest) -> Result<RegionResponse> {
626 let region_id = RegionId::from_u64(request.region_id);
627 let manifest_info = request
628 .manifest_info
629 .context(error::MissingRequiredFieldSnafu {
630 name: "manifest_info",
631 })?;
632
633 let manifest_info = match manifest_info {
634 ManifestInfo::MitoManifestInfo(info) => {
635 RegionManifestInfo::mito(info.data_manifest_version, 0, 0)
636 }
637 ManifestInfo::MetricManifestInfo(info) => RegionManifestInfo::metric(
638 info.data_manifest_version,
639 0,
640 info.metadata_manifest_version,
641 0,
642 ),
643 };
644
645 let tracing_context = TracingContext::from_current_span();
646 let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
647
648 self.sync_region(
649 region_id,
650 SyncRegionFromRequest::from_manifest(manifest_info),
651 )
652 .trace(span)
653 .await
654 .map(|_| RegionResponse::new(AffectedRows::default()))
655 }
656
657 #[tracing::instrument(skip_all)]
662 async fn handle_list_metadata_request(
663 &self,
664 request: &ListMetadataRequest,
665 ) -> Result<RegionResponse> {
666 let mut region_metadatas = Vec::new();
667 for region_id in &request.region_ids {
669 let region_id = RegionId::from_u64(*region_id);
670 let Some(engine) = self.find_engine(region_id)? else {
672 region_metadatas.push(None);
673 continue;
674 };
675
676 match engine.get_metadata(region_id).await {
677 Ok(metadata) => region_metadatas.push(Some(metadata)),
678 Err(err) => {
679 if err.status_code() == StatusCode::RegionNotFound {
680 region_metadatas.push(None);
681 } else {
682 Err(err).with_context(|_| GetRegionMetadataSnafu {
683 engine: engine.name(),
684 region_id,
685 })?;
686 }
687 }
688 }
689 }
690
691 let json_result = serde_json::to_vec(®ion_metadatas).context(SerializeJsonSnafu)?;
693
694 let response = RegionResponse::from_metadata(json_result);
695
696 Ok(response)
697 }
698
699 pub async fn sync_region(
701 &self,
702 region_id: RegionId,
703 request: SyncRegionFromRequest,
704 ) -> Result<()> {
705 let engine_with_status = self
706 .inner
707 .region_map
708 .get(®ion_id)
709 .with_context(|| RegionNotFoundSnafu { region_id })?;
710
711 self.inner
712 .handle_sync_region(engine_with_status.engine(), region_id, request)
713 .await
714 }
715
716 pub async fn remap_manifests(
718 &self,
719 request: RemapManifestsRequest,
720 ) -> Result<RemapManifestsResponse> {
721 let region_id = request.region_id;
722 let engine_with_status = self
723 .inner
724 .region_map
725 .get(®ion_id)
726 .with_context(|| RegionNotFoundSnafu { region_id })?;
727
728 engine_with_status
729 .engine()
730 .remap_manifests(request)
731 .await
732 .with_context(|_| HandleRegionRequestSnafu { region_id })
733 }
734
735 fn is_suspended(&self) -> bool {
736 self.suspend.load(Ordering::Relaxed)
737 }
738
739 pub(crate) fn suspend_state(&self) -> Arc<AtomicBool> {
740 self.suspend.clone()
741 }
742}
743
744#[async_trait]
745impl RegionServerHandler for RegionServer {
746 async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
747 let failed_requests_cnt = crate::metrics::REGION_SERVER_REQUEST_FAILURE_COUNT
748 .with_label_values(&[request.as_ref()]);
749 let response = match &request {
750 region_request::Body::Creates(_)
751 | region_request::Body::Drops(_)
752 | region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
753 region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
754 self.handle_requests_in_parallel(request).await
755 }
756 region_request::Body::Sync(sync_request) => {
757 self.handle_sync_region_request(sync_request).await
758 }
759 region_request::Body::ListMetadata(list_metadata_request) => {
760 self.handle_list_metadata_request(list_metadata_request)
761 .await
762 }
763 _ => self.handle_requests_in_serial(request).await,
764 }
765 .map_err(BoxedError::new)
766 .inspect_err(|_| {
767 failed_requests_cnt.inc();
768 })
769 .context(ExecuteGrpcRequestSnafu)?;
770
771 Ok(RegionResponseV1 {
772 header: Some(ResponseHeader {
773 status: Some(Status {
774 status_code: StatusCode::Success as _,
775 ..Default::default()
776 }),
777 }),
778 affected_rows: response.affected_rows as _,
779 extensions: response.extensions,
780 metadata: response.metadata,
781 })
782 }
783}
784
785#[async_trait]
786impl FlightCraft for RegionServer {
787 async fn do_get(
788 &self,
789 request: Request<Ticket>,
790 ) -> TonicResult<Response<TonicStream<FlightData>>> {
791 ensure!(!self.is_suspended(), SuspendedSnafu);
792
793 let ticket = request.into_inner().ticket;
794 let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
795 .context(servers_error::InvalidFlightTicketSnafu)?;
796 let tracing_context = request
797 .header
798 .as_ref()
799 .map(|h| TracingContext::from_w3c(&h.tracing_context))
800 .unwrap_or_default();
801 let query_ctx = request
802 .header
803 .as_ref()
804 .map(|h| Arc::new(QueryContext::from(h)))
805 .unwrap_or(QueryContext::arc());
806
807 let result = self
808 .handle_remote_read(request, query_ctx.clone())
809 .trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
810 .await?;
811
812 let stream = Box::pin(FlightRecordBatchStream::new(
813 result,
814 tracing_context,
815 self.flight_compression,
816 query_ctx,
817 ));
818 Ok(Response::new(stream))
819 }
820}
821
822#[derive(Clone)]
823enum RegionEngineWithStatus {
824 Registering(RegionEngineRef),
826 Deregistering(RegionEngineRef),
828 Ready(RegionEngineRef),
830}
831
832impl RegionEngineWithStatus {
833 pub fn into_engine(self) -> RegionEngineRef {
835 match self {
836 RegionEngineWithStatus::Registering(engine) => engine,
837 RegionEngineWithStatus::Deregistering(engine) => engine,
838 RegionEngineWithStatus::Ready(engine) => engine,
839 }
840 }
841
842 pub fn engine(&self) -> &RegionEngineRef {
844 match self {
845 RegionEngineWithStatus::Registering(engine) => engine,
846 RegionEngineWithStatus::Deregistering(engine) => engine,
847 RegionEngineWithStatus::Ready(engine) => engine,
848 }
849 }
850}
851
852impl Deref for RegionEngineWithStatus {
853 type Target = RegionEngineRef;
854
855 fn deref(&self) -> &Self::Target {
856 match self {
857 RegionEngineWithStatus::Registering(engine) => engine,
858 RegionEngineWithStatus::Deregistering(engine) => engine,
859 RegionEngineWithStatus::Ready(engine) => engine,
860 }
861 }
862}
863
864struct RegionServerInner {
865 engines: RwLock<HashMap<String, RegionEngineRef>>,
866 region_map: DashMap<RegionId, RegionEngineWithStatus>,
867 query_engine: QueryEngineRef,
868 runtime: Runtime,
869 event_listener: RegionServerEventListenerRef,
870 table_provider_factory: TableProviderFactoryRef,
871 parallelism: Option<RegionServerParallelism>,
874 topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
876 mito_engine: RwLock<Option<MitoEngine>>,
880}
881
882struct RegionServerParallelism {
883 semaphore: Semaphore,
884 timeout: Duration,
885}
886
887impl RegionServerParallelism {
888 pub fn from_opts(
889 max_concurrent_queries: usize,
890 concurrent_query_limiter_timeout: Duration,
891 ) -> Option<Self> {
892 if max_concurrent_queries == 0 {
893 return None;
894 }
895 Some(RegionServerParallelism {
896 semaphore: Semaphore::new(max_concurrent_queries),
897 timeout: concurrent_query_limiter_timeout,
898 })
899 }
900
901 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
902 timeout(self.timeout, self.semaphore.acquire())
903 .await
904 .context(ConcurrentQueryLimiterTimeoutSnafu)?
905 .context(ConcurrentQueryLimiterClosedSnafu)
906 }
907}
908
909enum CurrentEngine {
910 Engine(RegionEngineRef),
911 EarlyReturn(AffectedRows),
912}
913
914impl Debug for CurrentEngine {
915 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
916 match self {
917 CurrentEngine::Engine(engine) => f
918 .debug_struct("CurrentEngine")
919 .field("engine", &engine.name())
920 .finish(),
921 CurrentEngine::EarlyReturn(rows) => f
922 .debug_struct("CurrentEngine")
923 .field("return", rows)
924 .finish(),
925 }
926 }
927}
928
929impl RegionServerInner {
930 pub fn new(
931 query_engine: QueryEngineRef,
932 runtime: Runtime,
933 event_listener: RegionServerEventListenerRef,
934 table_provider_factory: TableProviderFactoryRef,
935 parallelism: Option<RegionServerParallelism>,
936 ) -> Self {
937 Self {
938 engines: RwLock::new(HashMap::new()),
939 region_map: DashMap::new(),
940 query_engine,
941 runtime,
942 event_listener,
943 table_provider_factory,
944 parallelism,
945 topic_stats_reporter: RwLock::new(None),
946 mito_engine: RwLock::new(None),
947 }
948 }
949
950 pub fn register_engine(&self, engine: RegionEngineRef) {
951 let engine_name = engine.name();
952 if engine_name == MITO_ENGINE_NAME
953 && let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
954 {
955 *self.mito_engine.write().unwrap() = Some(mito_engine.clone());
956 }
957
958 info!("Region Engine {engine_name} is registered");
959 self.engines
960 .write()
961 .unwrap()
962 .insert(engine_name.to_string(), engine);
963 }
964
965 pub fn set_topic_stats_reporter(&self, topic_stats_reporter: Box<dyn TopicStatsReporter>) {
966 info!("Set topic stats reporter");
967 *self.topic_stats_reporter.write().unwrap() = Some(topic_stats_reporter);
968 }
969
970 fn get_engine(
971 &self,
972 region_id: RegionId,
973 region_change: &RegionChange,
974 ) -> Result<CurrentEngine> {
975 let current_region_status = self.region_map.get(®ion_id);
976
977 let engine = match region_change {
978 RegionChange::Register(attribute) => match current_region_status {
979 Some(status) => match status.clone() {
980 RegionEngineWithStatus::Registering(engine) => engine,
981 RegionEngineWithStatus::Deregistering(_) => {
982 return error::RegionBusySnafu { region_id }.fail();
983 }
984 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
985 },
986 _ => self
987 .engines
988 .read()
989 .unwrap()
990 .get(attribute.engine())
991 .with_context(|| RegionEngineNotFoundSnafu {
992 name: attribute.engine(),
993 })?
994 .clone(),
995 },
996 RegionChange::Deregisters => match current_region_status {
997 Some(status) => match status.clone() {
998 RegionEngineWithStatus::Registering(_) => {
999 return error::RegionBusySnafu { region_id }.fail();
1000 }
1001 RegionEngineWithStatus::Deregistering(_) => {
1002 return Ok(CurrentEngine::EarlyReturn(0));
1003 }
1004 RegionEngineWithStatus::Ready(_) => status.clone().into_engine(),
1005 },
1006 None => return Ok(CurrentEngine::EarlyReturn(0)),
1007 },
1008 RegionChange::None | RegionChange::Catchup | RegionChange::Ingest => {
1009 match current_region_status {
1010 Some(status) => match status.clone() {
1011 RegionEngineWithStatus::Registering(_) => {
1012 return error::RegionNotReadySnafu { region_id }.fail();
1013 }
1014 RegionEngineWithStatus::Deregistering(_) => {
1015 return error::RegionNotFoundSnafu { region_id }.fail();
1016 }
1017 RegionEngineWithStatus::Ready(engine) => engine,
1018 },
1019 None => return error::RegionNotFoundSnafu { region_id }.fail(),
1020 }
1021 }
1022 };
1023
1024 Ok(CurrentEngine::Engine(engine))
1025 }
1026
1027 async fn handle_batch_open_requests_inner(
1028 &self,
1029 engine: RegionEngineRef,
1030 parallelism: usize,
1031 requests: Vec<(RegionId, RegionOpenRequest)>,
1032 ignore_nonexistent_region: bool,
1033 ) -> Result<Vec<RegionId>> {
1034 let region_changes = requests
1035 .iter()
1036 .map(|(region_id, open)| {
1037 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1038 Ok((*region_id, RegionChange::Register(attribute)))
1039 })
1040 .collect::<Result<HashMap<_, _>>>()?;
1041
1042 for (®ion_id, region_change) in ®ion_changes {
1043 self.set_region_status_not_ready(region_id, &engine, region_change)
1044 }
1045
1046 let mut open_regions = Vec::with_capacity(requests.len());
1047 let mut errors = vec![];
1048 match engine
1049 .handle_batch_open_requests(parallelism, requests)
1050 .await
1051 .with_context(|_| HandleBatchOpenRequestSnafu)
1052 {
1053 Ok(results) => {
1054 for (region_id, result) in results {
1055 let region_change = ®ion_changes[®ion_id];
1056 match result {
1057 Ok(_) => {
1058 if let Err(e) = self
1059 .set_region_status_ready(region_id, engine.clone(), *region_change)
1060 .await
1061 {
1062 error!(e; "Failed to set region to ready: {}", region_id);
1063 errors.push(BoxedError::new(e));
1064 } else {
1065 open_regions.push(region_id)
1066 }
1067 }
1068 Err(e) => {
1069 self.unset_region_status(region_id, &engine, *region_change);
1070 if e.status_code() == StatusCode::RegionNotFound
1071 && ignore_nonexistent_region
1072 {
1073 warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
1074 } else {
1075 error!(e; "Failed to open region: {}", region_id);
1076 errors.push(e);
1077 }
1078 }
1079 }
1080 }
1081 }
1082 Err(e) => {
1083 for (®ion_id, region_change) in ®ion_changes {
1084 self.unset_region_status(region_id, &engine, *region_change);
1085 }
1086 error!(e; "Failed to open batch regions");
1087 errors.push(BoxedError::new(e));
1088 }
1089 }
1090
1091 if !errors.is_empty() {
1092 return error::UnexpectedSnafu {
1093 violated: format!("Failed to open batch regions: {:?}", errors[0]),
1095 }
1096 .fail();
1097 }
1098
1099 Ok(open_regions)
1100 }
1101
1102 pub async fn handle_batch_open_requests(
1103 &self,
1104 parallelism: usize,
1105 requests: Vec<(RegionId, RegionOpenRequest)>,
1106 ignore_nonexistent_region: bool,
1107 ) -> Result<Vec<RegionId>> {
1108 let mut engine_grouped_requests: HashMap<String, Vec<_>> =
1109 HashMap::with_capacity(requests.len());
1110 for (region_id, request) in requests {
1111 if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
1112 requests.push((region_id, request));
1113 } else {
1114 engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
1115 }
1116 }
1117
1118 let mut results = Vec::with_capacity(engine_grouped_requests.keys().len());
1119 for (engine, requests) in engine_grouped_requests {
1120 let engine = self
1121 .engines
1122 .read()
1123 .unwrap()
1124 .get(&engine)
1125 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1126 .clone();
1127 results.push(
1128 self.handle_batch_open_requests_inner(
1129 engine,
1130 parallelism,
1131 requests,
1132 ignore_nonexistent_region,
1133 )
1134 .await,
1135 )
1136 }
1137
1138 Ok(results
1139 .into_iter()
1140 .collect::<Result<Vec<_>>>()?
1141 .into_iter()
1142 .flatten()
1143 .collect::<Vec<_>>())
1144 }
1145
1146 pub async fn handle_batch_catchup_requests_inner(
1147 &self,
1148 engine: RegionEngineRef,
1149 parallelism: usize,
1150 requests: Vec<(RegionId, RegionCatchupRequest)>,
1151 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1152 for (region_id, _) in &requests {
1153 self.set_region_status_not_ready(*region_id, &engine, &RegionChange::Catchup);
1154 }
1155 let region_ids = requests
1156 .iter()
1157 .map(|(region_id, _)| *region_id)
1158 .collect::<Vec<_>>();
1159 let mut responses = Vec::with_capacity(requests.len());
1160 match engine
1161 .handle_batch_catchup_requests(parallelism, requests)
1162 .await
1163 {
1164 Ok(results) => {
1165 for (region_id, result) in results {
1166 match result {
1167 Ok(_) => {
1168 if let Err(e) = self
1169 .set_region_status_ready(
1170 region_id,
1171 engine.clone(),
1172 RegionChange::Catchup,
1173 )
1174 .await
1175 {
1176 error!(e; "Failed to set region to ready: {}", region_id);
1177 responses.push((region_id, Err(BoxedError::new(e))));
1178 } else {
1179 responses.push((region_id, Ok(())));
1180 }
1181 }
1182 Err(e) => {
1183 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1184 error!(e; "Failed to catchup region: {}", region_id);
1185 responses.push((region_id, Err(e)));
1186 }
1187 }
1188 }
1189 }
1190 Err(e) => {
1191 for region_id in region_ids {
1192 self.unset_region_status(region_id, &engine, RegionChange::Catchup);
1193 }
1194 error!(e; "Failed to catchup batch regions");
1195 return error::UnexpectedSnafu {
1196 violated: format!("Failed to catchup batch regions: {:?}", e),
1197 }
1198 .fail();
1199 }
1200 }
1201
1202 Ok(responses)
1203 }
1204
1205 pub async fn handle_batch_catchup_requests(
1206 &self,
1207 parallelism: usize,
1208 requests: Vec<(RegionId, RegionCatchupRequest)>,
1209 ) -> Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>> {
1210 let mut engine_grouped_requests: HashMap<String, Vec<_>> = HashMap::new();
1211
1212 let mut responses = Vec::with_capacity(requests.len());
1213 for (region_id, request) in requests {
1214 if let Ok(engine) = self.get_engine(region_id, &RegionChange::Catchup) {
1215 match engine {
1216 CurrentEngine::Engine(engine) => {
1217 engine_grouped_requests
1218 .entry(engine.name().to_string())
1219 .or_default()
1220 .push((region_id, request));
1221 }
1222 CurrentEngine::EarlyReturn(_) => {
1223 return error::UnexpectedSnafu {
1224 violated: format!("Unexpected engine type for region {}", region_id),
1225 }
1226 .fail();
1227 }
1228 }
1229 } else {
1230 responses.push((
1231 region_id,
1232 Err(BoxedError::new(
1233 error::RegionNotFoundSnafu { region_id }.build(),
1234 )),
1235 ));
1236 }
1237 }
1238
1239 for (engine, requests) in engine_grouped_requests {
1240 let engine = self
1241 .engines
1242 .read()
1243 .unwrap()
1244 .get(&engine)
1245 .with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
1246 .clone();
1247 responses.extend(
1248 self.handle_batch_catchup_requests_inner(engine, parallelism, requests)
1249 .await?,
1250 );
1251 }
1252
1253 Ok(responses)
1254 }
1255
1256 pub async fn handle_batch_request(
1260 &self,
1261 batch_request: BatchRegionDdlRequest,
1262 ) -> Result<RegionResponse> {
1263 let region_changes = match &batch_request {
1264 BatchRegionDdlRequest::Create(requests) => requests
1265 .iter()
1266 .map(|(region_id, create)| {
1267 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1268 Ok((*region_id, RegionChange::Register(attribute)))
1269 })
1270 .collect::<Result<Vec<_>>>()?,
1271 BatchRegionDdlRequest::Drop(requests) => requests
1272 .iter()
1273 .map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
1274 .collect::<Vec<_>>(),
1275 BatchRegionDdlRequest::Alter(requests) => requests
1276 .iter()
1277 .map(|(region_id, _)| (*region_id, RegionChange::None))
1278 .collect::<Vec<_>>(),
1279 };
1280
1281 let (first_region_id, first_region_change) = region_changes.first().unwrap();
1284 let engine = match self.get_engine(*first_region_id, first_region_change)? {
1285 CurrentEngine::Engine(engine) => engine,
1286 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1287 };
1288
1289 for (region_id, region_change) in region_changes.iter() {
1290 self.set_region_status_not_ready(*region_id, &engine, region_change);
1291 }
1292
1293 let ddl_type = batch_request.request_type();
1294 let result = engine
1295 .handle_batch_ddl_requests(batch_request)
1296 .await
1297 .context(HandleBatchDdlRequestSnafu { ddl_type });
1298
1299 match result {
1300 Ok(result) => {
1301 for (region_id, region_change) in ®ion_changes {
1302 self.set_region_status_ready(*region_id, engine.clone(), *region_change)
1303 .await?;
1304 }
1305
1306 Ok(RegionResponse {
1307 affected_rows: result.affected_rows,
1308 extensions: result.extensions,
1309 metadata: Vec::new(),
1310 })
1311 }
1312 Err(err) => {
1313 for (region_id, region_change) in region_changes {
1314 self.unset_region_status(region_id, &engine, region_change);
1315 }
1316
1317 Err(err)
1318 }
1319 }
1320 }
1321
1322 pub async fn handle_request(
1323 &self,
1324 region_id: RegionId,
1325 request: RegionRequest,
1326 ) -> Result<RegionResponse> {
1327 let request_type = request.request_type();
1328 let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
1329 .with_label_values(&[request_type])
1330 .start_timer();
1331
1332 let region_change = match &request {
1333 RegionRequest::Create(create) => {
1334 let attribute = parse_region_attribute(&create.engine, &create.options)?;
1335 RegionChange::Register(attribute)
1336 }
1337 RegionRequest::Open(open) => {
1338 let attribute = parse_region_attribute(&open.engine, &open.options)?;
1339 RegionChange::Register(attribute)
1340 }
1341 RegionRequest::Close(_) | RegionRequest::Drop(_) => RegionChange::Deregisters,
1342 RegionRequest::Put(_) | RegionRequest::Delete(_) | RegionRequest::BulkInserts(_) => {
1343 RegionChange::Ingest
1344 }
1345 RegionRequest::Alter(_)
1346 | RegionRequest::Flush(_)
1347 | RegionRequest::Compact(_)
1348 | RegionRequest::Truncate(_)
1349 | RegionRequest::BuildIndex(_)
1350 | RegionRequest::EnterStaging(_)
1351 | RegionRequest::ApplyStagingManifest(_) => RegionChange::None,
1352 RegionRequest::Catchup(_) => RegionChange::Catchup,
1353 };
1354
1355 let engine = match self.get_engine(region_id, ®ion_change)? {
1356 CurrentEngine::Engine(engine) => engine,
1357 CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
1358 };
1359
1360 self.set_region_status_not_ready(region_id, &engine, ®ion_change);
1362
1363 match engine
1364 .handle_request(region_id, request)
1365 .await
1366 .with_context(|_| HandleRegionRequestSnafu { region_id })
1367 {
1368 Ok(result) => {
1369 if matches!(region_change, RegionChange::Ingest) {
1371 crate::metrics::REGION_CHANGED_ROW_COUNT
1372 .with_label_values(&[request_type])
1373 .inc_by(result.affected_rows as u64);
1374 }
1375 self.set_region_status_ready(region_id, engine.clone(), region_change)
1377 .await?;
1378
1379 Ok(RegionResponse {
1380 affected_rows: result.affected_rows,
1381 extensions: result.extensions,
1382 metadata: Vec::new(),
1383 })
1384 }
1385 Err(err) => {
1386 if matches!(region_change, RegionChange::Ingest) {
1387 crate::metrics::REGION_SERVER_INSERT_FAIL_COUNT
1388 .with_label_values(&[request_type])
1389 .inc();
1390 }
1391 self.unset_region_status(region_id, &engine, region_change);
1393 Err(err)
1394 }
1395 }
1396 }
1397
1398 pub async fn handle_sync_region(
1400 &self,
1401 engine: &RegionEngineRef,
1402 region_id: RegionId,
1403 request: SyncRegionFromRequest,
1404 ) -> Result<()> {
1405 let Some(new_opened_regions) = engine
1406 .sync_region(region_id, request)
1407 .await
1408 .with_context(|_| HandleRegionRequestSnafu { region_id })?
1409 .new_opened_logical_region_ids()
1410 else {
1411 return Ok(());
1412 };
1413
1414 for region in new_opened_regions {
1415 self.region_map
1416 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1417 info!("Logical region {} is registered!", region);
1418 }
1419
1420 Ok(())
1421 }
1422
1423 fn set_region_status_not_ready(
1424 &self,
1425 region_id: RegionId,
1426 engine: &RegionEngineRef,
1427 region_change: &RegionChange,
1428 ) {
1429 match region_change {
1430 RegionChange::Register(_) => {
1431 self.region_map.insert(
1432 region_id,
1433 RegionEngineWithStatus::Registering(engine.clone()),
1434 );
1435 }
1436 RegionChange::Deregisters => {
1437 self.region_map.insert(
1438 region_id,
1439 RegionEngineWithStatus::Deregistering(engine.clone()),
1440 );
1441 }
1442 _ => {}
1443 }
1444 }
1445
1446 fn unset_region_status(
1447 &self,
1448 region_id: RegionId,
1449 engine: &RegionEngineRef,
1450 region_change: RegionChange,
1451 ) {
1452 match region_change {
1453 RegionChange::None | RegionChange::Ingest => {}
1454 RegionChange::Register(_) => {
1455 self.region_map.remove(®ion_id);
1456 }
1457 RegionChange::Deregisters => {
1458 self.region_map
1459 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1460 }
1461 RegionChange::Catchup => {}
1462 }
1463 }
1464
1465 async fn set_region_status_ready(
1466 &self,
1467 region_id: RegionId,
1468 engine: RegionEngineRef,
1469 region_change: RegionChange,
1470 ) -> Result<()> {
1471 let engine_type = engine.name();
1472 match region_change {
1473 RegionChange::None | RegionChange::Ingest => {}
1474 RegionChange::Register(attribute) => {
1475 info!(
1476 "Region {region_id} is registered to engine {}",
1477 attribute.engine()
1478 );
1479 self.region_map
1480 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1481
1482 match attribute {
1483 RegionAttribute::Metric { physical } => {
1484 if physical {
1485 self.register_logical_regions(&engine, region_id).await?;
1487 self.event_listener.on_region_registered(region_id);
1489 }
1490 }
1491 RegionAttribute::Mito => self.event_listener.on_region_registered(region_id),
1492 RegionAttribute::File => {
1493 }
1495 }
1496 }
1497 RegionChange::Deregisters => {
1498 info!("Region {region_id} is deregistered from engine {engine_type}");
1499 self.region_map
1500 .remove(®ion_id)
1501 .map(|(id, engine)| engine.set_region_role(id, RegionRole::Follower));
1502 self.event_listener.on_region_deregistered(region_id);
1503 }
1504 RegionChange::Catchup => {
1505 if is_metric_engine(engine.name()) {
1506 self.register_logical_regions(&engine, region_id).await?;
1508 }
1509 }
1510 }
1511 Ok(())
1512 }
1513
1514 async fn register_logical_regions(
1515 &self,
1516 engine: &RegionEngineRef,
1517 physical_region_id: RegionId,
1518 ) -> Result<()> {
1519 let metric_engine =
1520 engine
1521 .as_any()
1522 .downcast_ref::<MetricEngine>()
1523 .context(UnexpectedSnafu {
1524 violated: format!(
1525 "expecting engine type '{}', actual '{}'",
1526 METRIC_ENGINE_NAME,
1527 engine.name(),
1528 ),
1529 })?;
1530
1531 let logical_regions = metric_engine
1532 .logical_regions(physical_region_id)
1533 .await
1534 .context(FindLogicalRegionsSnafu { physical_region_id })?;
1535
1536 for region in logical_regions {
1537 self.region_map
1538 .insert(region, RegionEngineWithStatus::Ready(engine.clone()));
1539 info!("Logical region {} is registered!", region);
1540 }
1541 Ok(())
1542 }
1543
1544 pub async fn handle_read(
1545 &self,
1546 request: QueryRequest,
1547 query_ctx: QueryContextRef,
1548 ) -> Result<SendableRecordBatchStream> {
1549 let result = self
1552 .query_engine
1553 .execute(request.plan, query_ctx)
1554 .await
1555 .context(ExecuteLogicalPlanSnafu)?;
1556
1557 match result.data {
1558 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => {
1559 UnsupportedOutputSnafu { expected: "stream" }.fail()
1560 }
1561 OutputData::Stream(stream) => Ok(stream),
1562 }
1563 }
1564
1565 async fn stop(&self) -> Result<()> {
1566 let regions = self
1574 .region_map
1575 .iter()
1576 .map(|x| (*x.key(), x.value().clone()))
1577 .collect::<Vec<_>>();
1578 let num_regions = regions.len();
1579
1580 for (region_id, engine) in regions {
1581 let closed = engine
1582 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1583 .await;
1584 match closed {
1585 Ok(_) => debug!("Region {region_id} is closed"),
1586 Err(e) => warn!("Failed to close region {region_id}, err: {e}"),
1587 }
1588 }
1589 self.region_map.clear();
1590 info!("closed {num_regions} regions");
1591
1592 drop(self.mito_engine.write().unwrap().take());
1593 let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
1594 for (engine_name, engine) in engines {
1595 engine
1596 .stop()
1597 .await
1598 .context(StopRegionEngineSnafu { name: &engine_name })?;
1599 info!("Region engine {engine_name} is stopped");
1600 }
1601
1602 Ok(())
1603 }
1604}
1605
1606#[derive(Debug, Clone, Copy)]
1607enum RegionChange {
1608 None,
1609 Register(RegionAttribute),
1610 Deregisters,
1611 Catchup,
1612 Ingest,
1613}
1614
1615fn is_metric_engine(engine: &str) -> bool {
1616 engine == METRIC_ENGINE_NAME
1617}
1618
1619fn parse_region_attribute(
1620 engine: &str,
1621 options: &HashMap<String, String>,
1622) -> Result<RegionAttribute> {
1623 match engine {
1624 MITO_ENGINE_NAME => Ok(RegionAttribute::Mito),
1625 METRIC_ENGINE_NAME => {
1626 let physical = !options.contains_key(LOGICAL_TABLE_METADATA_KEY);
1627
1628 Ok(RegionAttribute::Metric { physical })
1629 }
1630 FILE_ENGINE_NAME => Ok(RegionAttribute::File),
1631 _ => error::UnexpectedSnafu {
1632 violated: format!("Unknown engine: {}", engine),
1633 }
1634 .fail(),
1635 }
1636}
1637
1638#[derive(Debug, Clone, Copy)]
1639enum RegionAttribute {
1640 Mito,
1641 Metric { physical: bool },
1642 File,
1643}
1644
1645impl RegionAttribute {
1646 fn engine(&self) -> &'static str {
1647 match self {
1648 RegionAttribute::Mito => MITO_ENGINE_NAME,
1649 RegionAttribute::Metric { .. } => METRIC_ENGINE_NAME,
1650 RegionAttribute::File => FILE_ENGINE_NAME,
1651 }
1652 }
1653}
1654
1655#[cfg(test)]
1656mod tests {
1657
1658 use std::assert_matches::assert_matches;
1659
1660 use api::v1::SemanticType;
1661 use common_error::ext::ErrorExt;
1662 use datatypes::prelude::ConcreteDataType;
1663 use mito2::test_util::CreateRequestBuilder;
1664 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1665 use store_api::region_engine::RegionEngine;
1666 use store_api::region_request::{
1667 PathType, RegionDropRequest, RegionOpenRequest, RegionTruncateRequest,
1668 };
1669 use store_api::storage::RegionId;
1670
1671 use super::*;
1672 use crate::error::Result;
1673 use crate::tests::{MockRegionEngine, mock_region_server};
1674
1675 #[tokio::test]
1676 async fn test_region_registering() {
1677 common_telemetry::init_default_ut_logging();
1678
1679 let mut mock_region_server = mock_region_server();
1680 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1681 let engine_name = engine.name();
1682 mock_region_server.register_engine(engine.clone());
1683 let region_id = RegionId::new(1, 1);
1684 let builder = CreateRequestBuilder::new();
1685 let create_req = builder.build();
1686 mock_region_server.inner.region_map.insert(
1688 region_id,
1689 RegionEngineWithStatus::Registering(engine.clone()),
1690 );
1691 let response = mock_region_server
1692 .handle_request(region_id, RegionRequest::Create(create_req))
1693 .await
1694 .unwrap();
1695 assert_eq!(response.affected_rows, 0);
1696 let status = mock_region_server
1697 .inner
1698 .region_map
1699 .get(®ion_id)
1700 .unwrap()
1701 .clone();
1702 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1703
1704 mock_region_server.inner.region_map.insert(
1705 region_id,
1706 RegionEngineWithStatus::Registering(engine.clone()),
1707 );
1708 let response = mock_region_server
1709 .handle_request(
1710 region_id,
1711 RegionRequest::Open(RegionOpenRequest {
1712 engine: engine_name.to_string(),
1713 table_dir: String::new(),
1714 path_type: PathType::Bare,
1715 options: Default::default(),
1716 skip_wal_replay: false,
1717 checkpoint: None,
1718 }),
1719 )
1720 .await
1721 .unwrap();
1722 assert_eq!(response.affected_rows, 0);
1723 let status = mock_region_server
1724 .inner
1725 .region_map
1726 .get(®ion_id)
1727 .unwrap()
1728 .clone();
1729 assert!(matches!(status, RegionEngineWithStatus::Ready(_)));
1730 }
1731
1732 #[tokio::test]
1733 async fn test_region_deregistering() {
1734 common_telemetry::init_default_ut_logging();
1735
1736 let mut mock_region_server = mock_region_server();
1737 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1738
1739 mock_region_server.register_engine(engine.clone());
1740
1741 let region_id = RegionId::new(1, 1);
1742
1743 mock_region_server.inner.region_map.insert(
1745 region_id,
1746 RegionEngineWithStatus::Deregistering(engine.clone()),
1747 );
1748
1749 let response = mock_region_server
1750 .handle_request(
1751 region_id,
1752 RegionRequest::Drop(RegionDropRequest {
1753 fast_path: false,
1754 force: false,
1755 }),
1756 )
1757 .await
1758 .unwrap();
1759 assert_eq!(response.affected_rows, 0);
1760
1761 let status = mock_region_server
1762 .inner
1763 .region_map
1764 .get(®ion_id)
1765 .unwrap()
1766 .clone();
1767 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1768
1769 mock_region_server.inner.region_map.insert(
1770 region_id,
1771 RegionEngineWithStatus::Deregistering(engine.clone()),
1772 );
1773
1774 let response = mock_region_server
1775 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
1776 .await
1777 .unwrap();
1778 assert_eq!(response.affected_rows, 0);
1779
1780 let status = mock_region_server
1781 .inner
1782 .region_map
1783 .get(®ion_id)
1784 .unwrap()
1785 .clone();
1786 assert!(matches!(status, RegionEngineWithStatus::Deregistering(_)));
1787 }
1788
1789 #[tokio::test]
1790 async fn test_region_not_ready() {
1791 common_telemetry::init_default_ut_logging();
1792
1793 let mut mock_region_server = mock_region_server();
1794 let (engine, _receiver) = MockRegionEngine::new(MITO_ENGINE_NAME);
1795
1796 mock_region_server.register_engine(engine.clone());
1797
1798 let region_id = RegionId::new(1, 1);
1799
1800 mock_region_server.inner.region_map.insert(
1802 region_id,
1803 RegionEngineWithStatus::Registering(engine.clone()),
1804 );
1805
1806 let err = mock_region_server
1807 .handle_request(
1808 region_id,
1809 RegionRequest::Truncate(RegionTruncateRequest::All),
1810 )
1811 .await
1812 .unwrap_err();
1813
1814 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1815 }
1816
1817 #[tokio::test]
1818 async fn test_region_request_failed() {
1819 common_telemetry::init_default_ut_logging();
1820
1821 let mut mock_region_server = mock_region_server();
1822 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1823 MITO_ENGINE_NAME,
1824 Box::new(|_region_id, _request| {
1825 error::UnexpectedSnafu {
1826 violated: "test".to_string(),
1827 }
1828 .fail()
1829 }),
1830 );
1831
1832 mock_region_server.register_engine(engine.clone());
1833
1834 let region_id = RegionId::new(1, 1);
1835 let builder = CreateRequestBuilder::new();
1836 let create_req = builder.build();
1837 mock_region_server
1838 .handle_request(region_id, RegionRequest::Create(create_req))
1839 .await
1840 .unwrap_err();
1841
1842 let status = mock_region_server.inner.region_map.get(®ion_id);
1843 assert!(status.is_none());
1844
1845 mock_region_server
1846 .inner
1847 .region_map
1848 .insert(region_id, RegionEngineWithStatus::Ready(engine.clone()));
1849
1850 mock_region_server
1851 .handle_request(
1852 region_id,
1853 RegionRequest::Drop(RegionDropRequest {
1854 fast_path: false,
1855 force: false,
1856 }),
1857 )
1858 .await
1859 .unwrap_err();
1860
1861 let status = mock_region_server.inner.region_map.get(®ion_id);
1862 assert!(status.is_some());
1863 }
1864
1865 #[tokio::test]
1866 async fn test_batch_open_region_ignore_nonexistent_regions() {
1867 common_telemetry::init_default_ut_logging();
1868 let mut mock_region_server = mock_region_server();
1869 let (engine, _receiver) = MockRegionEngine::with_mock_fn(
1870 MITO_ENGINE_NAME,
1871 Box::new(|region_id, _request| {
1872 if region_id == RegionId::new(1, 1) {
1873 error::RegionNotFoundSnafu { region_id }.fail()
1874 } else {
1875 Ok(0)
1876 }
1877 }),
1878 );
1879 mock_region_server.register_engine(engine.clone());
1880
1881 let region_ids = mock_region_server
1882 .handle_batch_open_requests(
1883 8,
1884 vec![
1885 (
1886 RegionId::new(1, 1),
1887 RegionOpenRequest {
1888 engine: MITO_ENGINE_NAME.to_string(),
1889 table_dir: String::new(),
1890 path_type: PathType::Bare,
1891 options: Default::default(),
1892 skip_wal_replay: false,
1893 checkpoint: None,
1894 },
1895 ),
1896 (
1897 RegionId::new(1, 2),
1898 RegionOpenRequest {
1899 engine: MITO_ENGINE_NAME.to_string(),
1900 table_dir: String::new(),
1901 path_type: PathType::Bare,
1902 options: Default::default(),
1903 skip_wal_replay: false,
1904 checkpoint: None,
1905 },
1906 ),
1907 ],
1908 true,
1909 )
1910 .await
1911 .unwrap();
1912 assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
1913
1914 let err = mock_region_server
1915 .handle_batch_open_requests(
1916 8,
1917 vec![
1918 (
1919 RegionId::new(1, 1),
1920 RegionOpenRequest {
1921 engine: MITO_ENGINE_NAME.to_string(),
1922 table_dir: String::new(),
1923 path_type: PathType::Bare,
1924 options: Default::default(),
1925 skip_wal_replay: false,
1926 checkpoint: None,
1927 },
1928 ),
1929 (
1930 RegionId::new(1, 2),
1931 RegionOpenRequest {
1932 engine: MITO_ENGINE_NAME.to_string(),
1933 table_dir: String::new(),
1934 path_type: PathType::Bare,
1935 options: Default::default(),
1936 skip_wal_replay: false,
1937 checkpoint: None,
1938 },
1939 ),
1940 ],
1941 false,
1942 )
1943 .await
1944 .unwrap_err();
1945 assert_eq!(err.status_code(), StatusCode::Unexpected);
1946 }
1947
1948 struct CurrentEngineTest {
1949 region_id: RegionId,
1950 current_region_status: Option<RegionEngineWithStatus>,
1951 region_change: RegionChange,
1952 assert: Box<dyn FnOnce(Result<CurrentEngine>)>,
1953 }
1954
1955 #[tokio::test]
1956 async fn test_current_engine() {
1957 common_telemetry::init_default_ut_logging();
1958
1959 let mut mock_region_server = mock_region_server();
1960 let (engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
1961 mock_region_server.register_engine(engine.clone());
1962
1963 let region_id = RegionId::new(1024, 1);
1964 let tests = vec![
1965 CurrentEngineTest {
1967 region_id,
1968 current_region_status: None,
1969 region_change: RegionChange::None,
1970 assert: Box::new(|result| {
1971 let err = result.unwrap_err();
1972 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
1973 }),
1974 },
1975 CurrentEngineTest {
1976 region_id,
1977 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
1978 region_change: RegionChange::None,
1979 assert: Box::new(|result| {
1980 let current_engine = result.unwrap();
1981 assert_matches!(current_engine, CurrentEngine::Engine(_));
1982 }),
1983 },
1984 CurrentEngineTest {
1985 region_id,
1986 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
1987 region_change: RegionChange::None,
1988 assert: Box::new(|result| {
1989 let err = result.unwrap_err();
1990 assert_eq!(err.status_code(), StatusCode::RegionNotReady);
1991 }),
1992 },
1993 CurrentEngineTest {
1994 region_id,
1995 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
1996 region_change: RegionChange::None,
1997 assert: Box::new(|result| {
1998 let err = result.unwrap_err();
1999 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
2000 }),
2001 },
2002 CurrentEngineTest {
2004 region_id,
2005 current_region_status: None,
2006 region_change: RegionChange::Register(RegionAttribute::Mito),
2007 assert: Box::new(|result| {
2008 let current_engine = result.unwrap();
2009 assert_matches!(current_engine, CurrentEngine::Engine(_));
2010 }),
2011 },
2012 CurrentEngineTest {
2013 region_id,
2014 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2015 region_change: RegionChange::Register(RegionAttribute::Mito),
2016 assert: Box::new(|result| {
2017 let current_engine = result.unwrap();
2018 assert_matches!(current_engine, CurrentEngine::Engine(_));
2019 }),
2020 },
2021 CurrentEngineTest {
2022 region_id,
2023 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2024 region_change: RegionChange::Register(RegionAttribute::Mito),
2025 assert: Box::new(|result| {
2026 let err = result.unwrap_err();
2027 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2028 }),
2029 },
2030 CurrentEngineTest {
2031 region_id,
2032 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2033 region_change: RegionChange::Register(RegionAttribute::Mito),
2034 assert: Box::new(|result| {
2035 let current_engine = result.unwrap();
2036 assert_matches!(current_engine, CurrentEngine::Engine(_));
2037 }),
2038 },
2039 CurrentEngineTest {
2041 region_id,
2042 current_region_status: None,
2043 region_change: RegionChange::Deregisters,
2044 assert: Box::new(|result| {
2045 let current_engine = result.unwrap();
2046 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2047 }),
2048 },
2049 CurrentEngineTest {
2050 region_id,
2051 current_region_status: Some(RegionEngineWithStatus::Registering(engine.clone())),
2052 region_change: RegionChange::Deregisters,
2053 assert: Box::new(|result| {
2054 let err = result.unwrap_err();
2055 assert_eq!(err.status_code(), StatusCode::RegionBusy);
2056 }),
2057 },
2058 CurrentEngineTest {
2059 region_id,
2060 current_region_status: Some(RegionEngineWithStatus::Deregistering(engine.clone())),
2061 region_change: RegionChange::Deregisters,
2062 assert: Box::new(|result| {
2063 let current_engine = result.unwrap();
2064 assert_matches!(current_engine, CurrentEngine::EarlyReturn(_));
2065 }),
2066 },
2067 CurrentEngineTest {
2068 region_id,
2069 current_region_status: Some(RegionEngineWithStatus::Ready(engine.clone())),
2070 region_change: RegionChange::Deregisters,
2071 assert: Box::new(|result| {
2072 let current_engine = result.unwrap();
2073 assert_matches!(current_engine, CurrentEngine::Engine(_));
2074 }),
2075 },
2076 ];
2077
2078 for test in tests {
2079 let CurrentEngineTest {
2080 region_id,
2081 current_region_status,
2082 region_change,
2083 assert,
2084 } = test;
2085
2086 if let Some(status) = current_region_status {
2088 mock_region_server
2089 .inner
2090 .region_map
2091 .insert(region_id, status);
2092 } else {
2093 mock_region_server.inner.region_map.remove(®ion_id);
2094 }
2095
2096 let result = mock_region_server
2097 .inner
2098 .get_engine(region_id, ®ion_change);
2099
2100 assert(result);
2101 }
2102 }
2103
2104 #[tokio::test]
2105 async fn test_region_server_parallelism() {
2106 let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
2107 let first_query = p.acquire().await;
2108 assert!(first_query.is_ok());
2109 let second_query = p.acquire().await;
2110 assert!(second_query.is_ok());
2111 let third_query = p.acquire().await;
2112 assert!(third_query.is_err());
2113 let err = third_query.unwrap_err();
2114 assert_eq!(
2115 err.output_msg(),
2116 "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
2117 );
2118 drop(first_query);
2119 let forth_query = p.acquire().await;
2120 assert!(forth_query.is_ok());
2121 }
2122
2123 fn mock_region_metadata(region_id: RegionId) -> RegionMetadata {
2124 let mut metadata_builder = RegionMetadataBuilder::new(region_id);
2125 metadata_builder.push_column_metadata(ColumnMetadata {
2126 column_schema: datatypes::schema::ColumnSchema::new(
2127 "timestamp",
2128 ConcreteDataType::timestamp_nanosecond_datatype(),
2129 false,
2130 ),
2131 semantic_type: SemanticType::Timestamp,
2132 column_id: 0,
2133 });
2134 metadata_builder.push_column_metadata(ColumnMetadata {
2135 column_schema: datatypes::schema::ColumnSchema::new(
2136 "file",
2137 ConcreteDataType::string_datatype(),
2138 true,
2139 ),
2140 semantic_type: SemanticType::Tag,
2141 column_id: 1,
2142 });
2143 metadata_builder.push_column_metadata(ColumnMetadata {
2144 column_schema: datatypes::schema::ColumnSchema::new(
2145 "message",
2146 ConcreteDataType::string_datatype(),
2147 true,
2148 ),
2149 semantic_type: SemanticType::Field,
2150 column_id: 2,
2151 });
2152 metadata_builder.primary_key(vec![1]);
2153 metadata_builder.build().unwrap()
2154 }
2155
2156 #[tokio::test]
2157 async fn test_handle_list_metadata_request() {
2158 common_telemetry::init_default_ut_logging();
2159
2160 let mut mock_region_server = mock_region_server();
2161 let region_id_1 = RegionId::new(1, 0);
2162 let region_id_2 = RegionId::new(2, 0);
2163
2164 let metadata_1 = mock_region_metadata(region_id_1);
2165 let metadata_2 = mock_region_metadata(region_id_2);
2166 let metadatas = vec![Some(metadata_1.clone()), Some(metadata_2.clone())];
2167
2168 let metadata_1 = Arc::new(metadata_1);
2169 let metadata_2 = Arc::new(metadata_2);
2170 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2171 MITO_ENGINE_NAME,
2172 Box::new(move |region_id| {
2173 if region_id == region_id_1 {
2174 Ok(metadata_1.clone())
2175 } else if region_id == region_id_2 {
2176 Ok(metadata_2.clone())
2177 } else {
2178 error::RegionNotFoundSnafu { region_id }.fail()
2179 }
2180 }),
2181 );
2182
2183 mock_region_server.register_engine(engine.clone());
2184 mock_region_server
2185 .inner
2186 .region_map
2187 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2188 mock_region_server
2189 .inner
2190 .region_map
2191 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2192
2193 let list_metadata_request = ListMetadataRequest {
2195 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2196 };
2197 let response = mock_region_server
2198 .handle_list_metadata_request(&list_metadata_request)
2199 .await
2200 .unwrap();
2201 let decoded_metadata: Vec<Option<RegionMetadata>> =
2202 serde_json::from_slice(&response.metadata).unwrap();
2203 assert_eq!(metadatas, decoded_metadata);
2204 }
2205
2206 #[tokio::test]
2207 async fn test_handle_list_metadata_not_found() {
2208 common_telemetry::init_default_ut_logging();
2209
2210 let mut mock_region_server = mock_region_server();
2211 let region_id_1 = RegionId::new(1, 0);
2212 let region_id_2 = RegionId::new(2, 0);
2213
2214 let metadata_1 = mock_region_metadata(region_id_1);
2215 let metadatas = vec![Some(metadata_1.clone()), None];
2216
2217 let metadata_1 = Arc::new(metadata_1);
2218 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2219 MITO_ENGINE_NAME,
2220 Box::new(move |region_id| {
2221 if region_id == region_id_1 {
2222 Ok(metadata_1.clone())
2223 } else {
2224 error::RegionNotFoundSnafu { region_id }.fail()
2225 }
2226 }),
2227 );
2228
2229 mock_region_server.register_engine(engine.clone());
2230 mock_region_server
2231 .inner
2232 .region_map
2233 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2234
2235 let list_metadata_request = ListMetadataRequest {
2237 region_ids: vec![region_id_1.as_u64(), region_id_2.as_u64()],
2238 };
2239 let response = mock_region_server
2240 .handle_list_metadata_request(&list_metadata_request)
2241 .await
2242 .unwrap();
2243 let decoded_metadata: Vec<Option<RegionMetadata>> =
2244 serde_json::from_slice(&response.metadata).unwrap();
2245 assert_eq!(metadatas, decoded_metadata);
2246
2247 mock_region_server
2249 .inner
2250 .region_map
2251 .insert(region_id_2, RegionEngineWithStatus::Ready(engine.clone()));
2252 let response = mock_region_server
2253 .handle_list_metadata_request(&list_metadata_request)
2254 .await
2255 .unwrap();
2256 let decoded_metadata: Vec<Option<RegionMetadata>> =
2257 serde_json::from_slice(&response.metadata).unwrap();
2258 assert_eq!(metadatas, decoded_metadata);
2259 }
2260
2261 #[tokio::test]
2262 async fn test_handle_list_metadata_failed() {
2263 common_telemetry::init_default_ut_logging();
2264
2265 let mut mock_region_server = mock_region_server();
2266 let region_id_1 = RegionId::new(1, 0);
2267
2268 let (engine, _) = MockRegionEngine::with_metadata_mock_fn(
2269 MITO_ENGINE_NAME,
2270 Box::new(move |region_id| {
2271 error::UnexpectedSnafu {
2272 violated: format!("Failed to get region {region_id}"),
2273 }
2274 .fail()
2275 }),
2276 );
2277
2278 mock_region_server.register_engine(engine.clone());
2279 mock_region_server
2280 .inner
2281 .region_map
2282 .insert(region_id_1, RegionEngineWithStatus::Ready(engine.clone()));
2283
2284 let list_metadata_request = ListMetadataRequest {
2286 region_ids: vec![region_id_1.as_u64()],
2287 };
2288 mock_region_server
2289 .handle_list_metadata_request(&list_metadata_request)
2290 .await
2291 .unwrap_err();
2292 }
2293}