1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod scan_test;
57#[cfg(test)]
58mod set_role_state_test;
59#[cfg(test)]
60mod sync_test;
61#[cfg(test)]
62mod truncate_test;
63
64use std::any::Any;
65use std::collections::HashMap;
66use std::sync::Arc;
67use std::time::Instant;
68
69use api::region::RegionResponse;
70use async_trait::async_trait;
71use common_base::Plugins;
72use common_error::ext::BoxedError;
73use common_meta::key::SchemaMetadataManagerRef;
74use common_recordbatch::SendableRecordBatchStream;
75use common_telemetry::{info, tracing};
76use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
77use futures::future::{join_all, try_join_all};
78use object_store::manager::ObjectStoreManagerRef;
79use snafu::{ensure, OptionExt, ResultExt};
80use store_api::codec::PrimaryKeyEncoding;
81use store_api::logstore::provider::Provider;
82use store_api::logstore::LogStore;
83use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
84use store_api::metric_engine_consts::{
85 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
86};
87use store_api::region_engine::{
88 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
89 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
90};
91use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
92use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
93use store_api::ManifestVersion;
94use tokio::sync::{oneshot, Semaphore};
95
96use crate::cache::CacheStrategy;
97use crate::config::MitoConfig;
98use crate::error::{
99 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
100 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
101};
102#[cfg(feature = "enterprise")]
103use crate::extension::BoxedExtensionRangeProviderFactory;
104use crate::manifest::action::RegionEdit;
105use crate::memtable::MemtableStats;
106use crate::metrics::HANDLE_REQUEST_ELAPSED;
107use crate::read::scan_region::{ScanRegion, Scanner};
108use crate::read::stream::ScanBatchStream;
109use crate::region::MitoRegionRef;
110use crate::request::{RegionEditRequest, WorkerRequest};
111use crate::sst::file::FileMeta;
112use crate::wal::entry_distributor::{
113 build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
114};
115use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
116use crate::worker::WorkerGroup;
117
118pub const MITO_ENGINE_NAME: &str = "mito";
119
120pub struct MitoEngineBuilder<'a, S: LogStore> {
121 data_home: &'a str,
122 config: MitoConfig,
123 log_store: Arc<S>,
124 object_store_manager: ObjectStoreManagerRef,
125 schema_metadata_manager: SchemaMetadataManagerRef,
126 plugins: Plugins,
127 #[cfg(feature = "enterprise")]
128 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
129}
130
131impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
132 pub fn new(
133 data_home: &'a str,
134 config: MitoConfig,
135 log_store: Arc<S>,
136 object_store_manager: ObjectStoreManagerRef,
137 schema_metadata_manager: SchemaMetadataManagerRef,
138 plugins: Plugins,
139 ) -> Self {
140 Self {
141 data_home,
142 config,
143 log_store,
144 object_store_manager,
145 schema_metadata_manager,
146 plugins,
147 #[cfg(feature = "enterprise")]
148 extension_range_provider_factory: None,
149 }
150 }
151
152 #[cfg(feature = "enterprise")]
153 #[must_use]
154 pub fn with_extension_range_provider_factory(
155 self,
156 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
157 ) -> Self {
158 Self {
159 extension_range_provider_factory,
160 ..self
161 }
162 }
163
164 pub async fn try_build(mut self) -> Result<MitoEngine> {
165 self.config.sanitize(self.data_home)?;
166
167 let config = Arc::new(self.config);
168 let workers = WorkerGroup::start(
169 config.clone(),
170 self.log_store.clone(),
171 self.object_store_manager,
172 self.schema_metadata_manager,
173 self.plugins,
174 )
175 .await?;
176 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
177 let inner = EngineInner {
178 workers,
179 config,
180 wal_raw_entry_reader,
181 #[cfg(feature = "enterprise")]
182 extension_range_provider_factory: None,
183 };
184
185 #[cfg(feature = "enterprise")]
186 let inner =
187 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
188
189 Ok(MitoEngine {
190 inner: Arc::new(inner),
191 })
192 }
193}
194
195#[derive(Clone)]
197pub struct MitoEngine {
198 inner: Arc<EngineInner>,
199}
200
201impl MitoEngine {
202 pub async fn new<S: LogStore>(
204 data_home: &str,
205 config: MitoConfig,
206 log_store: Arc<S>,
207 object_store_manager: ObjectStoreManagerRef,
208 schema_metadata_manager: SchemaMetadataManagerRef,
209 plugins: Plugins,
210 ) -> Result<MitoEngine> {
211 let builder = MitoEngineBuilder::new(
212 data_home,
213 config,
214 log_store,
215 object_store_manager,
216 schema_metadata_manager,
217 plugins,
218 );
219 builder.try_build().await
220 }
221
222 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
224 self.inner.workers.is_region_exists(region_id)
225 }
226
227 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
229 self.inner.workers.is_region_opening(region_id)
230 }
231
232 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
234 self.find_region(region_id)
235 .map(|region| region.region_statistic())
236 }
237
238 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
240 self.find_region(region_id)
241 .map(|r| r.primary_key_encoding())
242 }
243
244 #[tracing::instrument(skip_all)]
249 pub async fn scan_to_stream(
250 &self,
251 region_id: RegionId,
252 request: ScanRequest,
253 ) -> Result<SendableRecordBatchStream, BoxedError> {
254 self.scanner(region_id, request)
255 .await
256 .map_err(BoxedError::new)?
257 .scan()
258 .await
259 }
260
261 pub async fn scan_batch(
263 &self,
264 region_id: RegionId,
265 request: ScanRequest,
266 filter_deleted: bool,
267 ) -> Result<ScanBatchStream> {
268 let mut scan_region = self.scan_region(region_id, request)?;
269 scan_region.set_filter_deleted(filter_deleted);
270 scan_region.scanner().await?.scan_batch()
271 }
272
273 async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
275 self.scan_region(region_id, request)?.scanner().await
276 }
277
278 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
280 self.inner.scan_region(region_id, request)
281 }
282
283 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
288 let _timer = HANDLE_REQUEST_ELAPSED
289 .with_label_values(&["edit_region"])
290 .start_timer();
291
292 ensure!(
293 is_valid_region_edit(&edit),
294 InvalidRequestSnafu {
295 region_id,
296 reason: "invalid region edit"
297 }
298 );
299
300 let (tx, rx) = oneshot::channel();
301 let request = WorkerRequest::EditRegion(RegionEditRequest {
302 region_id,
303 edit,
304 tx,
305 });
306 self.inner
307 .workers
308 .submit_to_worker(region_id, request)
309 .await?;
310 rx.await.context(RecvSnafu)?
311 }
312
313 #[cfg(test)]
314 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
315 self.find_region(id)
316 }
317
318 fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
319 self.inner.workers.get_region(region_id)
320 }
321
322 fn encode_manifest_info_to_extensions(
323 region_id: &RegionId,
324 manifest_info: RegionManifestInfo,
325 extensions: &mut HashMap<String, Vec<u8>>,
326 ) -> Result<()> {
327 let region_manifest_info = vec![(*region_id, manifest_info)];
328
329 extensions.insert(
330 MANIFEST_INFO_EXTENSION_KEY.to_string(),
331 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
332 );
333 info!(
334 "Added manifest info: {:?} to extensions, region_id: {:?}",
335 region_manifest_info, region_id
336 );
337 Ok(())
338 }
339
340 fn encode_column_metadatas_to_extensions(
341 region_id: &RegionId,
342 column_metadatas: Vec<ColumnMetadata>,
343 extensions: &mut HashMap<String, Vec<u8>>,
344 ) -> Result<()> {
345 extensions.insert(
346 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
347 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
348 );
349 info!(
350 "Added column metadatas: {:?} to extensions, region_id: {:?}",
351 column_metadatas, region_id
352 );
353 Ok(())
354 }
355
356 pub fn find_memtable_and_sst_stats(
359 &self,
360 region_id: RegionId,
361 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
362 let region = self
363 .find_region(region_id)
364 .context(RegionNotFoundSnafu { region_id })?;
365
366 let version = region.version();
367 let memtable_stats = version
368 .memtables
369 .list_memtables()
370 .iter()
371 .map(|x| x.stats())
372 .collect::<Vec<_>>();
373
374 let sst_stats = version
375 .ssts
376 .levels()
377 .iter()
378 .flat_map(|level| level.files().map(|x| x.meta_ref()))
379 .cloned()
380 .collect::<Vec<_>>();
381 Ok((memtable_stats, sst_stats))
382 }
383}
384
385fn is_valid_region_edit(edit: &RegionEdit) -> bool {
387 !edit.files_to_add.is_empty()
388 && edit.files_to_remove.is_empty()
389 && matches!(
390 edit,
391 RegionEdit {
392 files_to_add: _,
393 files_to_remove: _,
394 compaction_time_window: None,
395 flushed_entry_id: None,
396 flushed_sequence: None,
397 }
398 )
399}
400
401struct EngineInner {
403 workers: WorkerGroup,
405 config: Arc<MitoConfig>,
407 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
409 #[cfg(feature = "enterprise")]
410 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
411}
412
413type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
414
415fn prepare_batch_open_requests(
417 requests: Vec<(RegionId, RegionOpenRequest)>,
418) -> Result<(
419 TopicGroupedRegionOpenRequests,
420 Vec<(RegionId, RegionOpenRequest)>,
421)> {
422 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
423 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
424 for (region_id, request) in requests {
425 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
426 serde_json::from_str(options).context(SerdeJsonSnafu)?
427 } else {
428 WalOptions::RaftEngine
429 };
430 match options {
431 WalOptions::Kafka(options) => {
432 topic_to_regions
433 .entry(options.topic)
434 .or_default()
435 .push((region_id, request));
436 }
437 WalOptions::RaftEngine | WalOptions::Noop => {
438 remaining_regions.push((region_id, request));
439 }
440 }
441 }
442
443 Ok((topic_to_regions, remaining_regions))
444}
445
446impl EngineInner {
447 #[cfg(feature = "enterprise")]
448 #[must_use]
449 fn with_extension_range_provider_factory(
450 self,
451 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
452 ) -> Self {
453 Self {
454 extension_range_provider_factory,
455 ..self
456 }
457 }
458
459 async fn stop(&self) -> Result<()> {
461 self.workers.stop().await
462 }
463
464 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
465 self.workers
466 .get_region(region_id)
467 .context(RegionNotFoundSnafu { region_id })
468 }
469
470 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
474 let region = self.find_region(region_id)?;
476 Ok(region.metadata())
477 }
478
479 async fn open_topic_regions(
480 &self,
481 topic: String,
482 region_requests: Vec<(RegionId, RegionOpenRequest)>,
483 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
484 let region_ids = region_requests
485 .iter()
486 .map(|(region_id, _)| *region_id)
487 .collect::<Vec<_>>();
488 let provider = Provider::kafka_provider(topic);
489 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
490 provider,
491 self.wal_raw_entry_reader.clone(),
492 ®ion_ids,
493 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
494 );
495
496 let mut responses = Vec::with_capacity(region_requests.len());
497 for ((region_id, request), entry_receiver) in
498 region_requests.into_iter().zip(entry_receivers)
499 {
500 let (request, receiver) =
501 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
502 self.workers.submit_to_worker(region_id, request).await?;
503 responses.push(async move { receiver.await.context(RecvSnafu)? });
504 }
505
506 let distribution =
508 common_runtime::spawn_global(async move { distributor.distribute().await });
509 let responses = join_all(responses).await;
511
512 distribution.await.context(JoinSnafu)??;
513 Ok(region_ids.into_iter().zip(responses).collect())
514 }
515
516 async fn handle_batch_open_requests(
517 &self,
518 parallelism: usize,
519 requests: Vec<(RegionId, RegionOpenRequest)>,
520 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
521 let semaphore = Arc::new(Semaphore::new(parallelism));
522 let (topic_to_region_requests, remaining_region_requests) =
523 prepare_batch_open_requests(requests)?;
524 let mut responses =
525 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
526
527 if !topic_to_region_requests.is_empty() {
528 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
529 for (topic, region_requests) in topic_to_region_requests {
530 let semaphore_moved = semaphore.clone();
531 tasks.push(async move {
532 let _permit = semaphore_moved.acquire().await.unwrap();
534 self.open_topic_regions(topic, region_requests).await
535 })
536 }
537 let r = try_join_all(tasks).await?;
538 responses.extend(r.into_iter().flatten());
539 }
540
541 if !remaining_region_requests.is_empty() {
542 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
543 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
544 for (region_id, request) in remaining_region_requests {
545 let semaphore_moved = semaphore.clone();
546 region_ids.push(region_id);
547 tasks.push(async move {
548 let _permit = semaphore_moved.acquire().await.unwrap();
550 let (request, receiver) =
551 WorkerRequest::new_open_region_request(region_id, request, None);
552
553 self.workers.submit_to_worker(region_id, request).await?;
554
555 receiver.await.context(RecvSnafu)?
556 })
557 }
558
559 let results = join_all(tasks).await;
560 responses.extend(region_ids.into_iter().zip(results));
561 }
562
563 Ok(responses)
564 }
565
566 async fn handle_request(
568 &self,
569 region_id: RegionId,
570 request: RegionRequest,
571 ) -> Result<AffectedRows> {
572 let region_metadata = self.get_metadata(region_id).ok();
573 let (request, receiver) =
574 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
575 self.workers.submit_to_worker(region_id, request).await?;
576
577 receiver.await.context(RecvSnafu)?
578 }
579
580 fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
581 let region = self.find_region(region_id)?;
583 Ok(Some(region.find_committed_sequence()))
584 }
585
586 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
588 let query_start = Instant::now();
589 let region = self.find_region(region_id)?;
591 let version = region.version();
592 let cache_manager = self.workers.cache_manager();
594
595 let scan_region = ScanRegion::new(
596 version,
597 region.access_layer.clone(),
598 request,
599 CacheStrategy::EnableAll(cache_manager),
600 )
601 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
602 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
603 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
604 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
605 .with_start_time(query_start);
606
607 #[cfg(feature = "enterprise")]
608 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
609
610 Ok(scan_region)
611 }
612
613 #[cfg(feature = "enterprise")]
614 fn maybe_fill_extension_range_provider(
615 &self,
616 mut scan_region: ScanRegion,
617 region: MitoRegionRef,
618 ) -> ScanRegion {
619 if region.is_follower()
620 && let Some(factory) = self.extension_range_provider_factory.as_ref()
621 {
622 scan_region
623 .set_extension_range_provider(factory.create_extension_range_provider(region));
624 }
625 scan_region
626 }
627
628 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
630 let region = self.find_region(region_id)?;
631 region.set_role(role);
632 Ok(())
633 }
634
635 async fn set_region_role_state_gracefully(
637 &self,
638 region_id: RegionId,
639 region_role_state: SettableRegionRoleState,
640 ) -> Result<SetRegionRoleStateResponse> {
641 let (request, receiver) =
644 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
645 self.workers.submit_to_worker(region_id, request).await?;
646
647 receiver.await.context(RecvSnafu)
648 }
649
650 async fn sync_region(
651 &self,
652 region_id: RegionId,
653 manifest_info: RegionManifestInfo,
654 ) -> Result<(ManifestVersion, bool)> {
655 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
656 let manifest_version = manifest_info.data_manifest_version();
657 let (request, receiver) =
658 WorkerRequest::new_sync_region_request(region_id, manifest_version);
659 self.workers.submit_to_worker(region_id, request).await?;
660
661 receiver.await.context(RecvSnafu)?
662 }
663
664 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
665 self.workers.get_region(region_id).map(|region| {
666 if region.is_follower() {
667 RegionRole::Follower
668 } else {
669 RegionRole::Leader
670 }
671 })
672 }
673}
674
675#[async_trait]
676impl RegionEngine for MitoEngine {
677 fn name(&self) -> &str {
678 MITO_ENGINE_NAME
679 }
680
681 #[tracing::instrument(skip_all)]
682 async fn handle_batch_open_requests(
683 &self,
684 parallelism: usize,
685 requests: Vec<(RegionId, RegionOpenRequest)>,
686 ) -> Result<BatchResponses, BoxedError> {
687 self.inner
689 .handle_batch_open_requests(parallelism, requests)
690 .await
691 .map(|responses| {
692 responses
693 .into_iter()
694 .map(|(region_id, response)| {
695 (
696 region_id,
697 response.map(RegionResponse::new).map_err(BoxedError::new),
698 )
699 })
700 .collect::<Vec<_>>()
701 })
702 .map_err(BoxedError::new)
703 }
704
705 #[tracing::instrument(skip_all)]
706 async fn handle_request(
707 &self,
708 region_id: RegionId,
709 request: RegionRequest,
710 ) -> Result<RegionResponse, BoxedError> {
711 let _timer = HANDLE_REQUEST_ELAPSED
712 .with_label_values(&[request.request_type()])
713 .start_timer();
714
715 let is_alter = matches!(request, RegionRequest::Alter(_));
716 let is_create = matches!(request, RegionRequest::Create(_));
717 let mut response = self
718 .inner
719 .handle_request(region_id, request)
720 .await
721 .map(RegionResponse::new)
722 .map_err(BoxedError::new)?;
723
724 if is_alter {
725 self.handle_alter_response(region_id, &mut response)
726 .map_err(BoxedError::new)?;
727 } else if is_create {
728 self.handle_create_response(region_id, &mut response)
729 .map_err(BoxedError::new)?;
730 }
731
732 Ok(response)
733 }
734
735 #[tracing::instrument(skip_all)]
736 async fn handle_query(
737 &self,
738 region_id: RegionId,
739 request: ScanRequest,
740 ) -> Result<RegionScannerRef, BoxedError> {
741 self.scan_region(region_id, request)
742 .map_err(BoxedError::new)?
743 .region_scanner()
744 .await
745 .map_err(BoxedError::new)
746 }
747
748 async fn get_last_seq_num(
749 &self,
750 region_id: RegionId,
751 ) -> Result<Option<SequenceNumber>, BoxedError> {
752 self.inner
753 .get_last_seq_num(region_id)
754 .map_err(BoxedError::new)
755 }
756
757 async fn get_metadata(
759 &self,
760 region_id: RegionId,
761 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
762 self.inner.get_metadata(region_id).map_err(BoxedError::new)
763 }
764
765 async fn stop(&self) -> std::result::Result<(), BoxedError> {
771 self.inner.stop().await.map_err(BoxedError::new)
772 }
773
774 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
775 self.get_region_statistic(region_id)
776 }
777
778 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
779 self.inner
780 .set_region_role(region_id, role)
781 .map_err(BoxedError::new)
782 }
783
784 async fn set_region_role_state_gracefully(
785 &self,
786 region_id: RegionId,
787 region_role_state: SettableRegionRoleState,
788 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
789 let _timer = HANDLE_REQUEST_ELAPSED
790 .with_label_values(&["set_region_role_state_gracefully"])
791 .start_timer();
792
793 self.inner
794 .set_region_role_state_gracefully(region_id, region_role_state)
795 .await
796 .map_err(BoxedError::new)
797 }
798
799 async fn sync_region(
800 &self,
801 region_id: RegionId,
802 manifest_info: RegionManifestInfo,
803 ) -> Result<SyncManifestResponse, BoxedError> {
804 let (_, synced) = self
805 .inner
806 .sync_region(region_id, manifest_info)
807 .await
808 .map_err(BoxedError::new)?;
809
810 Ok(SyncManifestResponse::Mito { synced })
811 }
812
813 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
814 self.inner.role(region_id)
815 }
816
817 fn as_any(&self) -> &dyn Any {
818 self
819 }
820}
821
822impl MitoEngine {
823 fn handle_alter_response(
824 &self,
825 region_id: RegionId,
826 response: &mut RegionResponse,
827 ) -> Result<()> {
828 if let Some(statistic) = self.region_statistic(region_id) {
829 Self::encode_manifest_info_to_extensions(
830 ®ion_id,
831 statistic.manifest,
832 &mut response.extensions,
833 )?;
834 }
835 let column_metadatas = self
836 .inner
837 .find_region(region_id)
838 .ok()
839 .map(|r| r.metadata().column_metadatas.clone());
840 if let Some(column_metadatas) = column_metadatas {
841 Self::encode_column_metadatas_to_extensions(
842 ®ion_id,
843 column_metadatas,
844 &mut response.extensions,
845 )?;
846 }
847 Ok(())
848 }
849
850 fn handle_create_response(
851 &self,
852 region_id: RegionId,
853 response: &mut RegionResponse,
854 ) -> Result<()> {
855 let column_metadatas = self
856 .inner
857 .find_region(region_id)
858 .ok()
859 .map(|r| r.metadata().column_metadatas.clone());
860 if let Some(column_metadatas) = column_metadatas {
861 Self::encode_column_metadatas_to_extensions(
862 ®ion_id,
863 column_metadatas,
864 &mut response.extensions,
865 )?;
866 }
867 Ok(())
868 }
869}
870
871#[cfg(any(test, feature = "test"))]
873#[allow(clippy::too_many_arguments)]
874impl MitoEngine {
875 pub async fn new_for_test<S: LogStore>(
877 data_home: &str,
878 mut config: MitoConfig,
879 log_store: Arc<S>,
880 object_store_manager: ObjectStoreManagerRef,
881 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
882 listener: Option<crate::engine::listener::EventListenerRef>,
883 time_provider: crate::time_provider::TimeProviderRef,
884 schema_metadata_manager: SchemaMetadataManagerRef,
885 ) -> Result<MitoEngine> {
886 config.sanitize(data_home)?;
887
888 let config = Arc::new(config);
889 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
890 Ok(MitoEngine {
891 inner: Arc::new(EngineInner {
892 workers: WorkerGroup::start_for_test(
893 config.clone(),
894 log_store,
895 object_store_manager,
896 write_buffer_manager,
897 listener,
898 schema_metadata_manager,
899 time_provider,
900 )
901 .await?,
902 config,
903 wal_raw_entry_reader,
904 #[cfg(feature = "enterprise")]
905 extension_range_provider_factory: None,
906 }),
907 })
908 }
909
910 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
912 self.inner.workers.purge_scheduler()
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use std::time::Duration;
919
920 use super::*;
921 use crate::sst::file::FileMeta;
922
923 #[test]
924 fn test_is_valid_region_edit() {
925 let edit = RegionEdit {
927 files_to_add: vec![FileMeta::default()],
928 files_to_remove: vec![],
929 compaction_time_window: None,
930 flushed_entry_id: None,
931 flushed_sequence: None,
932 };
933 assert!(is_valid_region_edit(&edit));
934
935 let edit = RegionEdit {
937 files_to_add: vec![],
938 files_to_remove: vec![],
939 compaction_time_window: None,
940 flushed_entry_id: None,
941 flushed_sequence: None,
942 };
943 assert!(!is_valid_region_edit(&edit));
944
945 let edit = RegionEdit {
947 files_to_add: vec![FileMeta::default()],
948 files_to_remove: vec![FileMeta::default()],
949 compaction_time_window: None,
950 flushed_entry_id: None,
951 flushed_sequence: None,
952 };
953 assert!(!is_valid_region_edit(&edit));
954
955 let edit = RegionEdit {
957 files_to_add: vec![FileMeta::default()],
958 files_to_remove: vec![],
959 compaction_time_window: Some(Duration::from_secs(1)),
960 flushed_entry_id: None,
961 flushed_sequence: None,
962 };
963 assert!(!is_valid_region_edit(&edit));
964 let edit = RegionEdit {
965 files_to_add: vec![FileMeta::default()],
966 files_to_remove: vec![],
967 compaction_time_window: None,
968 flushed_entry_id: Some(1),
969 flushed_sequence: None,
970 };
971 assert!(!is_valid_region_edit(&edit));
972 let edit = RegionEdit {
973 files_to_add: vec![FileMeta::default()],
974 files_to_remove: vec![],
975 compaction_time_window: None,
976 flushed_entry_id: None,
977 flushed_sequence: Some(1),
978 };
979 assert!(!is_valid_region_edit(&edit));
980 }
981}