1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod scan_test;
57#[cfg(test)]
58mod set_role_state_test;
59#[cfg(test)]
60mod staging_test;
61#[cfg(test)]
62mod sync_test;
63#[cfg(test)]
64mod truncate_test;
65
66use std::any::Any;
67use std::collections::HashMap;
68use std::sync::Arc;
69use std::time::Instant;
70
71use api::region::RegionResponse;
72use async_trait::async_trait;
73use common_base::Plugins;
74use common_error::ext::BoxedError;
75use common_meta::key::SchemaMetadataManagerRef;
76use common_recordbatch::SendableRecordBatchStream;
77use common_telemetry::{info, tracing};
78use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
79use futures::future::{join_all, try_join_all};
80use futures::stream::{self, Stream, StreamExt};
81use object_store::manager::ObjectStoreManagerRef;
82use snafu::{ensure, OptionExt, ResultExt};
83use store_api::codec::PrimaryKeyEncoding;
84use store_api::logstore::provider::Provider;
85use store_api::logstore::LogStore;
86use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
87use store_api::metric_engine_consts::{
88 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
89};
90use store_api::region_engine::{
91 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
92 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
93};
94use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
95use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
96use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
97use store_api::ManifestVersion;
98use tokio::sync::{oneshot, Semaphore};
99
100use crate::cache::{CacheManagerRef, CacheStrategy};
101use crate::config::MitoConfig;
102use crate::error::{
103 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
104 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
105};
106#[cfg(feature = "enterprise")]
107use crate::extension::BoxedExtensionRangeProviderFactory;
108use crate::manifest::action::RegionEdit;
109use crate::memtable::MemtableStats;
110use crate::metrics::HANDLE_REQUEST_ELAPSED;
111use crate::read::scan_region::{ScanRegion, Scanner};
112use crate::read::stream::ScanBatchStream;
113use crate::region::MitoRegionRef;
114use crate::request::{RegionEditRequest, WorkerRequest};
115use crate::sst::file::FileMeta;
116use crate::sst::file_ref::FileReferenceManagerRef;
117use crate::wal::entry_distributor::{
118 build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
119};
120use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
121use crate::worker::WorkerGroup;
122
123pub const MITO_ENGINE_NAME: &str = "mito";
124
125pub struct MitoEngineBuilder<'a, S: LogStore> {
126 data_home: &'a str,
127 config: MitoConfig,
128 log_store: Arc<S>,
129 object_store_manager: ObjectStoreManagerRef,
130 schema_metadata_manager: SchemaMetadataManagerRef,
131 file_ref_manager: FileReferenceManagerRef,
132 plugins: Plugins,
133 #[cfg(feature = "enterprise")]
134 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
135}
136
137impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
138 pub fn new(
139 data_home: &'a str,
140 config: MitoConfig,
141 log_store: Arc<S>,
142 object_store_manager: ObjectStoreManagerRef,
143 schema_metadata_manager: SchemaMetadataManagerRef,
144 file_ref_manager: FileReferenceManagerRef,
145 plugins: Plugins,
146 ) -> Self {
147 Self {
148 data_home,
149 config,
150 log_store,
151 object_store_manager,
152 schema_metadata_manager,
153 file_ref_manager,
154 plugins,
155 #[cfg(feature = "enterprise")]
156 extension_range_provider_factory: None,
157 }
158 }
159
160 #[cfg(feature = "enterprise")]
161 #[must_use]
162 pub fn with_extension_range_provider_factory(
163 self,
164 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
165 ) -> Self {
166 Self {
167 extension_range_provider_factory,
168 ..self
169 }
170 }
171
172 pub async fn try_build(mut self) -> Result<MitoEngine> {
173 self.config.sanitize(self.data_home)?;
174
175 let config = Arc::new(self.config);
176 let workers = WorkerGroup::start(
177 config.clone(),
178 self.log_store.clone(),
179 self.object_store_manager,
180 self.schema_metadata_manager,
181 self.file_ref_manager,
182 self.plugins,
183 )
184 .await?;
185 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
186 let inner = EngineInner {
187 workers,
188 config,
189 wal_raw_entry_reader,
190 #[cfg(feature = "enterprise")]
191 extension_range_provider_factory: None,
192 };
193
194 #[cfg(feature = "enterprise")]
195 let inner =
196 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
197
198 Ok(MitoEngine {
199 inner: Arc::new(inner),
200 })
201 }
202}
203
204#[derive(Clone)]
206pub struct MitoEngine {
207 inner: Arc<EngineInner>,
208}
209
210impl MitoEngine {
211 pub async fn new<S: LogStore>(
213 data_home: &str,
214 config: MitoConfig,
215 log_store: Arc<S>,
216 object_store_manager: ObjectStoreManagerRef,
217 schema_metadata_manager: SchemaMetadataManagerRef,
218 file_ref_manager: FileReferenceManagerRef,
219 plugins: Plugins,
220 ) -> Result<MitoEngine> {
221 let builder = MitoEngineBuilder::new(
222 data_home,
223 config,
224 log_store,
225 object_store_manager,
226 schema_metadata_manager,
227 file_ref_manager,
228 plugins,
229 );
230 builder.try_build().await
231 }
232
233 pub fn mito_config(&self) -> &MitoConfig {
234 &self.inner.config
235 }
236
237 pub fn cache_manager(&self) -> CacheManagerRef {
238 self.inner.workers.cache_manager()
239 }
240
241 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
242 self.inner.workers.file_ref_manager()
243 }
244
245 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
247 self.inner.workers.is_region_exists(region_id)
248 }
249
250 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
252 self.inner.workers.is_region_opening(region_id)
253 }
254
255 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
257 self.find_region(region_id)
258 .map(|region| region.region_statistic())
259 }
260
261 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
263 self.find_region(region_id)
264 .map(|r| r.primary_key_encoding())
265 }
266
267 #[tracing::instrument(skip_all)]
272 pub async fn scan_to_stream(
273 &self,
274 region_id: RegionId,
275 request: ScanRequest,
276 ) -> Result<SendableRecordBatchStream, BoxedError> {
277 self.scanner(region_id, request)
278 .await
279 .map_err(BoxedError::new)?
280 .scan()
281 .await
282 }
283
284 pub async fn scan_batch(
286 &self,
287 region_id: RegionId,
288 request: ScanRequest,
289 filter_deleted: bool,
290 ) -> Result<ScanBatchStream> {
291 let mut scan_region = self.scan_region(region_id, request)?;
292 scan_region.set_filter_deleted(filter_deleted);
293 scan_region.scanner().await?.scan_batch()
294 }
295
296 async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
298 self.scan_region(region_id, request)?.scanner().await
299 }
300
301 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
303 self.inner.scan_region(region_id, request)
304 }
305
306 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
311 let _timer = HANDLE_REQUEST_ELAPSED
312 .with_label_values(&["edit_region"])
313 .start_timer();
314
315 ensure!(
316 is_valid_region_edit(&edit),
317 InvalidRequestSnafu {
318 region_id,
319 reason: "invalid region edit"
320 }
321 );
322
323 let (tx, rx) = oneshot::channel();
324 let request = WorkerRequest::EditRegion(RegionEditRequest {
325 region_id,
326 edit,
327 tx,
328 });
329 self.inner
330 .workers
331 .submit_to_worker(region_id, request)
332 .await?;
333 rx.await.context(RecvSnafu)?
334 }
335
336 #[cfg(test)]
337 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
338 self.find_region(id)
339 }
340
341 pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
342 self.inner.workers.get_region(region_id)
343 }
344
345 fn encode_manifest_info_to_extensions(
346 region_id: &RegionId,
347 manifest_info: RegionManifestInfo,
348 extensions: &mut HashMap<String, Vec<u8>>,
349 ) -> Result<()> {
350 let region_manifest_info = vec![(*region_id, manifest_info)];
351
352 extensions.insert(
353 MANIFEST_INFO_EXTENSION_KEY.to_string(),
354 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
355 );
356 info!(
357 "Added manifest info: {:?} to extensions, region_id: {:?}",
358 region_manifest_info, region_id
359 );
360 Ok(())
361 }
362
363 fn encode_column_metadatas_to_extensions(
364 region_id: &RegionId,
365 column_metadatas: Vec<ColumnMetadata>,
366 extensions: &mut HashMap<String, Vec<u8>>,
367 ) -> Result<()> {
368 extensions.insert(
369 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
370 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
371 );
372 info!(
373 "Added column metadatas: {:?} to extensions, region_id: {:?}",
374 column_metadatas, region_id
375 );
376 Ok(())
377 }
378
379 pub fn find_memtable_and_sst_stats(
382 &self,
383 region_id: RegionId,
384 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
385 let region = self
386 .find_region(region_id)
387 .context(RegionNotFoundSnafu { region_id })?;
388
389 let version = region.version();
390 let memtable_stats = version
391 .memtables
392 .list_memtables()
393 .iter()
394 .map(|x| x.stats())
395 .collect::<Vec<_>>();
396
397 let sst_stats = version
398 .ssts
399 .levels()
400 .iter()
401 .flat_map(|level| level.files().map(|x| x.meta_ref()))
402 .cloned()
403 .collect::<Vec<_>>();
404 Ok((memtable_stats, sst_stats))
405 }
406
407 pub fn all_ssts_from_manifest(&self) -> impl Iterator<Item = ManifestSstEntry> + use<'_> {
409 self.inner
410 .workers
411 .all_regions()
412 .flat_map(|region| region.manifest_sst_entries())
413 }
414
415 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
417 let regions = self.inner.workers.all_regions();
418
419 let mut layers_distinct_table_dirs = HashMap::new();
420 for region in regions {
421 let table_dir = region.access_layer.table_dir();
422 if !layers_distinct_table_dirs.contains_key(table_dir) {
423 layers_distinct_table_dirs
424 .insert(table_dir.to_string(), region.access_layer.clone());
425 }
426 }
427
428 stream::iter(layers_distinct_table_dirs)
429 .map(|(_, access_layer)| access_layer.storage_sst_entries())
430 .flatten()
431 }
432}
433
434fn is_valid_region_edit(edit: &RegionEdit) -> bool {
436 !edit.files_to_add.is_empty()
437 && edit.files_to_remove.is_empty()
438 && matches!(
439 edit,
440 RegionEdit {
441 files_to_add: _,
442 files_to_remove: _,
443 timestamp_ms: _,
444 compaction_time_window: None,
445 flushed_entry_id: None,
446 flushed_sequence: None,
447 }
448 )
449}
450
451struct EngineInner {
453 workers: WorkerGroup,
455 config: Arc<MitoConfig>,
457 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
459 #[cfg(feature = "enterprise")]
460 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
461}
462
463type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
464
465fn prepare_batch_open_requests(
467 requests: Vec<(RegionId, RegionOpenRequest)>,
468) -> Result<(
469 TopicGroupedRegionOpenRequests,
470 Vec<(RegionId, RegionOpenRequest)>,
471)> {
472 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
473 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
474 for (region_id, request) in requests {
475 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
476 serde_json::from_str(options).context(SerdeJsonSnafu)?
477 } else {
478 WalOptions::RaftEngine
479 };
480 match options {
481 WalOptions::Kafka(options) => {
482 topic_to_regions
483 .entry(options.topic)
484 .or_default()
485 .push((region_id, request));
486 }
487 WalOptions::RaftEngine | WalOptions::Noop => {
488 remaining_regions.push((region_id, request));
489 }
490 }
491 }
492
493 Ok((topic_to_regions, remaining_regions))
494}
495
496impl EngineInner {
497 #[cfg(feature = "enterprise")]
498 #[must_use]
499 fn with_extension_range_provider_factory(
500 self,
501 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
502 ) -> Self {
503 Self {
504 extension_range_provider_factory,
505 ..self
506 }
507 }
508
509 async fn stop(&self) -> Result<()> {
511 self.workers.stop().await
512 }
513
514 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
515 self.workers
516 .get_region(region_id)
517 .context(RegionNotFoundSnafu { region_id })
518 }
519
520 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
524 let region = self.find_region(region_id)?;
526 Ok(region.metadata())
527 }
528
529 async fn open_topic_regions(
530 &self,
531 topic: String,
532 region_requests: Vec<(RegionId, RegionOpenRequest)>,
533 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
534 let now = Instant::now();
535 let region_ids = region_requests
536 .iter()
537 .map(|(region_id, _)| *region_id)
538 .collect::<Vec<_>>();
539 let provider = Provider::kafka_provider(topic);
540 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
541 provider.clone(),
542 self.wal_raw_entry_reader.clone(),
543 ®ion_ids,
544 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
545 );
546
547 let mut responses = Vec::with_capacity(region_requests.len());
548 for ((region_id, request), entry_receiver) in
549 region_requests.into_iter().zip(entry_receivers)
550 {
551 let (request, receiver) =
552 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
553 self.workers.submit_to_worker(region_id, request).await?;
554 responses.push(async move { receiver.await.context(RecvSnafu)? });
555 }
556
557 let distribution =
559 common_runtime::spawn_global(async move { distributor.distribute().await });
560 let responses = join_all(responses).await;
562 distribution.await.context(JoinSnafu)??;
563
564 let num_failure = responses.iter().filter(|r| r.is_err()).count();
565 info!(
566 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
567 region_ids.len() - num_failure,
568 provider.as_kafka_provider().unwrap(),
570 num_failure,
571 now.elapsed(),
572 );
573 Ok(region_ids.into_iter().zip(responses).collect())
574 }
575
576 async fn handle_batch_open_requests(
577 &self,
578 parallelism: usize,
579 requests: Vec<(RegionId, RegionOpenRequest)>,
580 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
581 let semaphore = Arc::new(Semaphore::new(parallelism));
582 let (topic_to_region_requests, remaining_region_requests) =
583 prepare_batch_open_requests(requests)?;
584 let mut responses =
585 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
586
587 if !topic_to_region_requests.is_empty() {
588 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
589 for (topic, region_requests) in topic_to_region_requests {
590 let semaphore_moved = semaphore.clone();
591 tasks.push(async move {
592 let _permit = semaphore_moved.acquire().await.unwrap();
594 self.open_topic_regions(topic, region_requests).await
595 })
596 }
597 let r = try_join_all(tasks).await?;
598 responses.extend(r.into_iter().flatten());
599 }
600
601 if !remaining_region_requests.is_empty() {
602 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
603 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
604 for (region_id, request) in remaining_region_requests {
605 let semaphore_moved = semaphore.clone();
606 region_ids.push(region_id);
607 tasks.push(async move {
608 let _permit = semaphore_moved.acquire().await.unwrap();
610 let (request, receiver) =
611 WorkerRequest::new_open_region_request(region_id, request, None);
612
613 self.workers.submit_to_worker(region_id, request).await?;
614
615 receiver.await.context(RecvSnafu)?
616 })
617 }
618
619 let results = join_all(tasks).await;
620 responses.extend(region_ids.into_iter().zip(results));
621 }
622
623 Ok(responses)
624 }
625
626 async fn handle_request(
628 &self,
629 region_id: RegionId,
630 request: RegionRequest,
631 ) -> Result<AffectedRows> {
632 let region_metadata = self.get_metadata(region_id).ok();
633 let (request, receiver) =
634 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
635 self.workers.submit_to_worker(region_id, request).await?;
636
637 receiver.await.context(RecvSnafu)?
638 }
639
640 fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
641 let region = self.find_region(region_id)?;
643 Ok(Some(region.find_committed_sequence()))
644 }
645
646 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
648 let query_start = Instant::now();
649 let region = self.find_region(region_id)?;
651 let version = region.version();
652 let cache_manager = self.workers.cache_manager();
654
655 let scan_region = ScanRegion::new(
656 version,
657 region.access_layer.clone(),
658 request,
659 CacheStrategy::EnableAll(cache_manager),
660 )
661 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
662 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
663 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
664 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
665 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
666 .with_start_time(query_start)
667 .with_flat_format(false);
669
670 #[cfg(feature = "enterprise")]
671 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
672
673 Ok(scan_region)
674 }
675
676 #[cfg(feature = "enterprise")]
677 fn maybe_fill_extension_range_provider(
678 &self,
679 mut scan_region: ScanRegion,
680 region: MitoRegionRef,
681 ) -> ScanRegion {
682 if region.is_follower()
683 && let Some(factory) = self.extension_range_provider_factory.as_ref()
684 {
685 scan_region
686 .set_extension_range_provider(factory.create_extension_range_provider(region));
687 }
688 scan_region
689 }
690
691 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
693 let region = self.find_region(region_id)?;
694 region.set_role(role);
695 Ok(())
696 }
697
698 async fn set_region_role_state_gracefully(
700 &self,
701 region_id: RegionId,
702 region_role_state: SettableRegionRoleState,
703 ) -> Result<SetRegionRoleStateResponse> {
704 let (request, receiver) =
707 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
708 self.workers.submit_to_worker(region_id, request).await?;
709
710 receiver.await.context(RecvSnafu)
711 }
712
713 async fn sync_region(
714 &self,
715 region_id: RegionId,
716 manifest_info: RegionManifestInfo,
717 ) -> Result<(ManifestVersion, bool)> {
718 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
719 let manifest_version = manifest_info.data_manifest_version();
720 let (request, receiver) =
721 WorkerRequest::new_sync_region_request(region_id, manifest_version);
722 self.workers.submit_to_worker(region_id, request).await?;
723
724 receiver.await.context(RecvSnafu)?
725 }
726
727 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
728 self.workers.get_region(region_id).map(|region| {
729 if region.is_follower() {
730 RegionRole::Follower
731 } else {
732 RegionRole::Leader
733 }
734 })
735 }
736}
737
738#[async_trait]
739impl RegionEngine for MitoEngine {
740 fn name(&self) -> &str {
741 MITO_ENGINE_NAME
742 }
743
744 #[tracing::instrument(skip_all)]
745 async fn handle_batch_open_requests(
746 &self,
747 parallelism: usize,
748 requests: Vec<(RegionId, RegionOpenRequest)>,
749 ) -> Result<BatchResponses, BoxedError> {
750 self.inner
752 .handle_batch_open_requests(parallelism, requests)
753 .await
754 .map(|responses| {
755 responses
756 .into_iter()
757 .map(|(region_id, response)| {
758 (
759 region_id,
760 response.map(RegionResponse::new).map_err(BoxedError::new),
761 )
762 })
763 .collect::<Vec<_>>()
764 })
765 .map_err(BoxedError::new)
766 }
767
768 #[tracing::instrument(skip_all)]
769 async fn handle_request(
770 &self,
771 region_id: RegionId,
772 request: RegionRequest,
773 ) -> Result<RegionResponse, BoxedError> {
774 let _timer = HANDLE_REQUEST_ELAPSED
775 .with_label_values(&[request.request_type()])
776 .start_timer();
777
778 let is_alter = matches!(request, RegionRequest::Alter(_));
779 let is_create = matches!(request, RegionRequest::Create(_));
780 let mut response = self
781 .inner
782 .handle_request(region_id, request)
783 .await
784 .map(RegionResponse::new)
785 .map_err(BoxedError::new)?;
786
787 if is_alter {
788 self.handle_alter_response(region_id, &mut response)
789 .map_err(BoxedError::new)?;
790 } else if is_create {
791 self.handle_create_response(region_id, &mut response)
792 .map_err(BoxedError::new)?;
793 }
794
795 Ok(response)
796 }
797
798 #[tracing::instrument(skip_all)]
799 async fn handle_query(
800 &self,
801 region_id: RegionId,
802 request: ScanRequest,
803 ) -> Result<RegionScannerRef, BoxedError> {
804 self.scan_region(region_id, request)
805 .map_err(BoxedError::new)?
806 .region_scanner()
807 .await
808 .map_err(BoxedError::new)
809 }
810
811 async fn get_last_seq_num(
812 &self,
813 region_id: RegionId,
814 ) -> Result<Option<SequenceNumber>, BoxedError> {
815 self.inner
816 .get_last_seq_num(region_id)
817 .map_err(BoxedError::new)
818 }
819
820 async fn get_metadata(
822 &self,
823 region_id: RegionId,
824 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
825 self.inner.get_metadata(region_id).map_err(BoxedError::new)
826 }
827
828 async fn stop(&self) -> std::result::Result<(), BoxedError> {
834 self.inner.stop().await.map_err(BoxedError::new)
835 }
836
837 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
838 self.get_region_statistic(region_id)
839 }
840
841 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
842 self.inner
843 .set_region_role(region_id, role)
844 .map_err(BoxedError::new)
845 }
846
847 async fn set_region_role_state_gracefully(
848 &self,
849 region_id: RegionId,
850 region_role_state: SettableRegionRoleState,
851 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
852 let _timer = HANDLE_REQUEST_ELAPSED
853 .with_label_values(&["set_region_role_state_gracefully"])
854 .start_timer();
855
856 self.inner
857 .set_region_role_state_gracefully(region_id, region_role_state)
858 .await
859 .map_err(BoxedError::new)
860 }
861
862 async fn sync_region(
863 &self,
864 region_id: RegionId,
865 manifest_info: RegionManifestInfo,
866 ) -> Result<SyncManifestResponse, BoxedError> {
867 let (_, synced) = self
868 .inner
869 .sync_region(region_id, manifest_info)
870 .await
871 .map_err(BoxedError::new)?;
872
873 Ok(SyncManifestResponse::Mito { synced })
874 }
875
876 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
877 self.inner.role(region_id)
878 }
879
880 fn as_any(&self) -> &dyn Any {
881 self
882 }
883}
884
885impl MitoEngine {
886 fn handle_alter_response(
887 &self,
888 region_id: RegionId,
889 response: &mut RegionResponse,
890 ) -> Result<()> {
891 if let Some(statistic) = self.region_statistic(region_id) {
892 Self::encode_manifest_info_to_extensions(
893 ®ion_id,
894 statistic.manifest,
895 &mut response.extensions,
896 )?;
897 }
898 let column_metadatas = self
899 .inner
900 .find_region(region_id)
901 .ok()
902 .map(|r| r.metadata().column_metadatas.clone());
903 if let Some(column_metadatas) = column_metadatas {
904 Self::encode_column_metadatas_to_extensions(
905 ®ion_id,
906 column_metadatas,
907 &mut response.extensions,
908 )?;
909 }
910 Ok(())
911 }
912
913 fn handle_create_response(
914 &self,
915 region_id: RegionId,
916 response: &mut RegionResponse,
917 ) -> Result<()> {
918 let column_metadatas = self
919 .inner
920 .find_region(region_id)
921 .ok()
922 .map(|r| r.metadata().column_metadatas.clone());
923 if let Some(column_metadatas) = column_metadatas {
924 Self::encode_column_metadatas_to_extensions(
925 ®ion_id,
926 column_metadatas,
927 &mut response.extensions,
928 )?;
929 }
930 Ok(())
931 }
932}
933
934#[cfg(any(test, feature = "test"))]
936#[allow(clippy::too_many_arguments)]
937impl MitoEngine {
938 pub async fn new_for_test<S: LogStore>(
940 data_home: &str,
941 mut config: MitoConfig,
942 log_store: Arc<S>,
943 object_store_manager: ObjectStoreManagerRef,
944 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
945 listener: Option<crate::engine::listener::EventListenerRef>,
946 time_provider: crate::time_provider::TimeProviderRef,
947 schema_metadata_manager: SchemaMetadataManagerRef,
948 file_ref_manager: FileReferenceManagerRef,
949 ) -> Result<MitoEngine> {
950 config.sanitize(data_home)?;
951
952 let config = Arc::new(config);
953 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
954 Ok(MitoEngine {
955 inner: Arc::new(EngineInner {
956 workers: WorkerGroup::start_for_test(
957 config.clone(),
958 log_store,
959 object_store_manager,
960 write_buffer_manager,
961 listener,
962 schema_metadata_manager,
963 file_ref_manager,
964 time_provider,
965 )
966 .await?,
967 config,
968 wal_raw_entry_reader,
969 #[cfg(feature = "enterprise")]
970 extension_range_provider_factory: None,
971 }),
972 })
973 }
974
975 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
977 self.inner.workers.purge_scheduler()
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use std::time::Duration;
984
985 use super::*;
986 use crate::sst::file::FileMeta;
987
988 #[test]
989 fn test_is_valid_region_edit() {
990 let edit = RegionEdit {
992 files_to_add: vec![FileMeta::default()],
993 files_to_remove: vec![],
994 timestamp_ms: None,
995 compaction_time_window: None,
996 flushed_entry_id: None,
997 flushed_sequence: None,
998 };
999 assert!(is_valid_region_edit(&edit));
1000
1001 let edit = RegionEdit {
1003 files_to_add: vec![],
1004 files_to_remove: vec![],
1005 timestamp_ms: None,
1006 compaction_time_window: None,
1007 flushed_entry_id: None,
1008 flushed_sequence: None,
1009 };
1010 assert!(!is_valid_region_edit(&edit));
1011
1012 let edit = RegionEdit {
1014 files_to_add: vec![FileMeta::default()],
1015 files_to_remove: vec![FileMeta::default()],
1016 timestamp_ms: None,
1017 compaction_time_window: None,
1018 flushed_entry_id: None,
1019 flushed_sequence: None,
1020 };
1021 assert!(!is_valid_region_edit(&edit));
1022
1023 let edit = RegionEdit {
1025 files_to_add: vec![FileMeta::default()],
1026 files_to_remove: vec![],
1027 timestamp_ms: None,
1028 compaction_time_window: Some(Duration::from_secs(1)),
1029 flushed_entry_id: None,
1030 flushed_sequence: None,
1031 };
1032 assert!(!is_valid_region_edit(&edit));
1033 let edit = RegionEdit {
1034 files_to_add: vec![FileMeta::default()],
1035 files_to_remove: vec![],
1036 timestamp_ms: None,
1037 compaction_time_window: None,
1038 flushed_entry_id: Some(1),
1039 flushed_sequence: None,
1040 };
1041 assert!(!is_valid_region_edit(&edit));
1042 let edit = RegionEdit {
1043 files_to_add: vec![FileMeta::default()],
1044 files_to_remove: vec![],
1045 timestamp_ms: None,
1046 compaction_time_window: None,
1047 flushed_entry_id: None,
1048 flushed_sequence: Some(1),
1049 };
1050 assert!(!is_valid_region_edit(&edit));
1051 }
1052}