1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod bump_committed_sequence_test;
27#[cfg(test)]
28mod catchup_test;
29#[cfg(test)]
30mod close_test;
31#[cfg(test)]
32mod compaction_test;
33#[cfg(test)]
34mod create_test;
35#[cfg(test)]
36mod drop_test;
37#[cfg(test)]
38mod edit_region_test;
39#[cfg(test)]
40mod filter_deleted_test;
41#[cfg(test)]
42mod flush_test;
43#[cfg(any(test, feature = "test"))]
44pub mod listener;
45#[cfg(test)]
46mod merge_mode_test;
47#[cfg(test)]
48mod open_test;
49#[cfg(test)]
50mod parallel_test;
51#[cfg(test)]
52mod projection_test;
53#[cfg(test)]
54mod prune_test;
55#[cfg(test)]
56mod row_selector_test;
57#[cfg(test)]
58mod scan_corrupt;
59#[cfg(test)]
60mod scan_test;
61#[cfg(test)]
62mod set_role_state_test;
63#[cfg(test)]
64mod staging_test;
65#[cfg(test)]
66mod sync_test;
67#[cfg(test)]
68mod truncate_test;
69
70use std::any::Any;
71use std::collections::HashMap;
72use std::sync::Arc;
73use std::time::Instant;
74
75use api::region::RegionResponse;
76use async_trait::async_trait;
77use common_base::Plugins;
78use common_error::ext::BoxedError;
79use common_meta::key::SchemaMetadataManagerRef;
80use common_recordbatch::SendableRecordBatchStream;
81use common_telemetry::{info, tracing};
82use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
83use futures::future::{join_all, try_join_all};
84use futures::stream::{self, Stream, StreamExt};
85use object_store::manager::ObjectStoreManagerRef;
86use snafu::{OptionExt, ResultExt, ensure};
87use store_api::ManifestVersion;
88use store_api::codec::PrimaryKeyEncoding;
89use store_api::logstore::LogStore;
90use store_api::logstore::provider::Provider;
91use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
92use store_api::metric_engine_consts::{
93 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
94};
95use store_api::region_engine::{
96 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
97 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
98};
99use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
100use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
101use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
102use tokio::sync::{Semaphore, oneshot};
103
104use crate::cache::{CacheManagerRef, CacheStrategy};
105use crate::config::MitoConfig;
106use crate::error::{
107 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
108 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
109};
110#[cfg(feature = "enterprise")]
111use crate::extension::BoxedExtensionRangeProviderFactory;
112use crate::manifest::action::RegionEdit;
113use crate::memtable::MemtableStats;
114use crate::metrics::HANDLE_REQUEST_ELAPSED;
115use crate::read::scan_region::{ScanRegion, Scanner};
116use crate::read::stream::ScanBatchStream;
117use crate::region::MitoRegionRef;
118use crate::region::opener::PartitionExprFetcherRef;
119use crate::request::{RegionEditRequest, WorkerRequest};
120use crate::sst::file::FileMeta;
121use crate::sst::file_ref::FileReferenceManagerRef;
122use crate::wal::entry_distributor::{
123 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
124};
125use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
126use crate::worker::WorkerGroup;
127
128pub const MITO_ENGINE_NAME: &str = "mito";
129
130pub struct MitoEngineBuilder<'a, S: LogStore> {
131 data_home: &'a str,
132 config: MitoConfig,
133 log_store: Arc<S>,
134 object_store_manager: ObjectStoreManagerRef,
135 schema_metadata_manager: SchemaMetadataManagerRef,
136 file_ref_manager: FileReferenceManagerRef,
137 partition_expr_fetcher: PartitionExprFetcherRef,
138 plugins: Plugins,
139 #[cfg(feature = "enterprise")]
140 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
141}
142
143impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
144 #[allow(clippy::too_many_arguments)]
145 pub fn new(
146 data_home: &'a str,
147 config: MitoConfig,
148 log_store: Arc<S>,
149 object_store_manager: ObjectStoreManagerRef,
150 schema_metadata_manager: SchemaMetadataManagerRef,
151 file_ref_manager: FileReferenceManagerRef,
152 partition_expr_fetcher: PartitionExprFetcherRef,
153 plugins: Plugins,
154 ) -> Self {
155 Self {
156 data_home,
157 config,
158 log_store,
159 object_store_manager,
160 schema_metadata_manager,
161 file_ref_manager,
162 plugins,
163 partition_expr_fetcher,
164 #[cfg(feature = "enterprise")]
165 extension_range_provider_factory: None,
166 }
167 }
168
169 #[cfg(feature = "enterprise")]
170 #[must_use]
171 pub fn with_extension_range_provider_factory(
172 self,
173 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
174 ) -> Self {
175 Self {
176 extension_range_provider_factory,
177 ..self
178 }
179 }
180
181 pub async fn try_build(mut self) -> Result<MitoEngine> {
182 self.config.sanitize(self.data_home)?;
183
184 let config = Arc::new(self.config);
185 let workers = WorkerGroup::start(
186 config.clone(),
187 self.log_store.clone(),
188 self.object_store_manager,
189 self.schema_metadata_manager,
190 self.file_ref_manager,
191 self.partition_expr_fetcher.clone(),
192 self.plugins,
193 )
194 .await?;
195 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
196 let inner = EngineInner {
197 workers,
198 config,
199 wal_raw_entry_reader,
200 #[cfg(feature = "enterprise")]
201 extension_range_provider_factory: None,
202 };
203
204 #[cfg(feature = "enterprise")]
205 let inner =
206 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
207
208 Ok(MitoEngine {
209 inner: Arc::new(inner),
210 })
211 }
212}
213
214#[derive(Clone)]
216pub struct MitoEngine {
217 inner: Arc<EngineInner>,
218}
219
220impl MitoEngine {
221 #[allow(clippy::too_many_arguments)]
223 pub async fn new<S: LogStore>(
224 data_home: &str,
225 config: MitoConfig,
226 log_store: Arc<S>,
227 object_store_manager: ObjectStoreManagerRef,
228 schema_metadata_manager: SchemaMetadataManagerRef,
229 file_ref_manager: FileReferenceManagerRef,
230 partition_expr_fetcher: PartitionExprFetcherRef,
231 plugins: Plugins,
232 ) -> Result<MitoEngine> {
233 let builder = MitoEngineBuilder::new(
234 data_home,
235 config,
236 log_store,
237 object_store_manager,
238 schema_metadata_manager,
239 file_ref_manager,
240 partition_expr_fetcher,
241 plugins,
242 );
243 builder.try_build().await
244 }
245
246 pub fn mito_config(&self) -> &MitoConfig {
247 &self.inner.config
248 }
249
250 pub fn cache_manager(&self) -> CacheManagerRef {
251 self.inner.workers.cache_manager()
252 }
253
254 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
255 self.inner.workers.file_ref_manager()
256 }
257
258 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
260 self.inner.workers.is_region_exists(region_id)
261 }
262
263 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
265 self.inner.workers.is_region_opening(region_id)
266 }
267
268 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
270 self.find_region(region_id)
271 .map(|region| region.region_statistic())
272 }
273
274 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
276 self.find_region(region_id)
277 .map(|r| r.primary_key_encoding())
278 }
279
280 #[tracing::instrument(skip_all)]
285 pub async fn scan_to_stream(
286 &self,
287 region_id: RegionId,
288 request: ScanRequest,
289 ) -> Result<SendableRecordBatchStream, BoxedError> {
290 self.scanner(region_id, request)
291 .await
292 .map_err(BoxedError::new)?
293 .scan()
294 .await
295 }
296
297 pub async fn scan_batch(
299 &self,
300 region_id: RegionId,
301 request: ScanRequest,
302 filter_deleted: bool,
303 ) -> Result<ScanBatchStream> {
304 let mut scan_region = self.scan_region(region_id, request)?;
305 scan_region.set_filter_deleted(filter_deleted);
306 scan_region.scanner().await?.scan_batch()
307 }
308
309 async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
311 self.scan_region(region_id, request)?.scanner().await
312 }
313
314 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
316 self.inner.scan_region(region_id, request)
317 }
318
319 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
324 let _timer = HANDLE_REQUEST_ELAPSED
325 .with_label_values(&["edit_region"])
326 .start_timer();
327
328 ensure!(
329 is_valid_region_edit(&edit),
330 InvalidRequestSnafu {
331 region_id,
332 reason: "invalid region edit"
333 }
334 );
335
336 let (tx, rx) = oneshot::channel();
337 let request = WorkerRequest::EditRegion(RegionEditRequest {
338 region_id,
339 edit,
340 tx,
341 });
342 self.inner
343 .workers
344 .submit_to_worker(region_id, request)
345 .await?;
346 rx.await.context(RecvSnafu)?
347 }
348
349 #[cfg(test)]
350 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
351 self.find_region(id)
352 }
353
354 pub(crate) fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
355 self.inner.workers.get_region(region_id)
356 }
357
358 fn encode_manifest_info_to_extensions(
359 region_id: &RegionId,
360 manifest_info: RegionManifestInfo,
361 extensions: &mut HashMap<String, Vec<u8>>,
362 ) -> Result<()> {
363 let region_manifest_info = vec![(*region_id, manifest_info)];
364
365 extensions.insert(
366 MANIFEST_INFO_EXTENSION_KEY.to_string(),
367 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
368 );
369 info!(
370 "Added manifest info: {:?} to extensions, region_id: {:?}",
371 region_manifest_info, region_id
372 );
373 Ok(())
374 }
375
376 fn encode_column_metadatas_to_extensions(
377 region_id: &RegionId,
378 column_metadatas: Vec<ColumnMetadata>,
379 extensions: &mut HashMap<String, Vec<u8>>,
380 ) -> Result<()> {
381 extensions.insert(
382 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
383 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
384 );
385 info!(
386 "Added column metadatas: {:?} to extensions, region_id: {:?}",
387 column_metadatas, region_id
388 );
389 Ok(())
390 }
391
392 pub fn find_memtable_and_sst_stats(
395 &self,
396 region_id: RegionId,
397 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
398 let region = self
399 .find_region(region_id)
400 .context(RegionNotFoundSnafu { region_id })?;
401
402 let version = region.version();
403 let memtable_stats = version
404 .memtables
405 .list_memtables()
406 .iter()
407 .map(|x| x.stats())
408 .collect::<Vec<_>>();
409
410 let sst_stats = version
411 .ssts
412 .levels()
413 .iter()
414 .flat_map(|level| level.files().map(|x| x.meta_ref()))
415 .cloned()
416 .collect::<Vec<_>>();
417 Ok((memtable_stats, sst_stats))
418 }
419
420 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
422 let node_id = self.inner.workers.file_ref_manager().node_id();
423 let regions = self.inner.workers.all_regions();
424
425 let mut results = Vec::new();
426 for region in regions {
427 let mut entries = region.manifest_sst_entries().await;
428 for e in &mut entries {
429 e.node_id = node_id;
430 }
431 results.extend(entries);
432 }
433
434 results
435 }
436
437 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
439 let node_id = self.inner.workers.file_ref_manager().node_id();
440 let regions = self.inner.workers.all_regions();
441
442 let mut layers_distinct_table_dirs = HashMap::new();
443 for region in regions {
444 let table_dir = region.access_layer.table_dir();
445 if !layers_distinct_table_dirs.contains_key(table_dir) {
446 layers_distinct_table_dirs
447 .insert(table_dir.to_string(), region.access_layer.clone());
448 }
449 }
450
451 stream::iter(layers_distinct_table_dirs)
452 .map(|(_, access_layer)| access_layer.storage_sst_entries())
453 .flatten()
454 .map(move |entry| {
455 entry.map(move |mut entry| {
456 entry.node_id = node_id;
457 entry
458 })
459 })
460 }
461}
462
463fn is_valid_region_edit(edit: &RegionEdit) -> bool {
465 !edit.files_to_add.is_empty()
466 && edit.files_to_remove.is_empty()
467 && matches!(
468 edit,
469 RegionEdit {
470 files_to_add: _,
471 files_to_remove: _,
472 timestamp_ms: _,
473 compaction_time_window: None,
474 flushed_entry_id: None,
475 flushed_sequence: None,
476 ..
477 }
478 )
479}
480
481struct EngineInner {
483 workers: WorkerGroup,
485 config: Arc<MitoConfig>,
487 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
489 #[cfg(feature = "enterprise")]
490 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
491}
492
493type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
494
495fn prepare_batch_open_requests(
497 requests: Vec<(RegionId, RegionOpenRequest)>,
498) -> Result<(
499 TopicGroupedRegionOpenRequests,
500 Vec<(RegionId, RegionOpenRequest)>,
501)> {
502 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
503 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
504 for (region_id, request) in requests {
505 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
506 serde_json::from_str(options).context(SerdeJsonSnafu)?
507 } else {
508 WalOptions::RaftEngine
509 };
510 match options {
511 WalOptions::Kafka(options) => {
512 topic_to_regions
513 .entry(options.topic)
514 .or_default()
515 .push((region_id, request));
516 }
517 WalOptions::RaftEngine | WalOptions::Noop => {
518 remaining_regions.push((region_id, request));
519 }
520 }
521 }
522
523 Ok((topic_to_regions, remaining_regions))
524}
525
526impl EngineInner {
527 #[cfg(feature = "enterprise")]
528 #[must_use]
529 fn with_extension_range_provider_factory(
530 self,
531 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
532 ) -> Self {
533 Self {
534 extension_range_provider_factory,
535 ..self
536 }
537 }
538
539 async fn stop(&self) -> Result<()> {
541 self.workers.stop().await
542 }
543
544 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
545 self.workers
546 .get_region(region_id)
547 .context(RegionNotFoundSnafu { region_id })
548 }
549
550 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
554 let region = self.find_region(region_id)?;
556 Ok(region.metadata())
557 }
558
559 async fn open_topic_regions(
560 &self,
561 topic: String,
562 region_requests: Vec<(RegionId, RegionOpenRequest)>,
563 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
564 let now = Instant::now();
565 let region_ids = region_requests
566 .iter()
567 .map(|(region_id, _)| *region_id)
568 .collect::<Vec<_>>();
569 let provider = Provider::kafka_provider(topic);
570 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
571 provider.clone(),
572 self.wal_raw_entry_reader.clone(),
573 ®ion_ids,
574 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
575 );
576
577 let mut responses = Vec::with_capacity(region_requests.len());
578 for ((region_id, request), entry_receiver) in
579 region_requests.into_iter().zip(entry_receivers)
580 {
581 let (request, receiver) =
582 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
583 self.workers.submit_to_worker(region_id, request).await?;
584 responses.push(async move { receiver.await.context(RecvSnafu)? });
585 }
586
587 let distribution =
589 common_runtime::spawn_global(async move { distributor.distribute().await });
590 let responses = join_all(responses).await;
592 distribution.await.context(JoinSnafu)??;
593
594 let num_failure = responses.iter().filter(|r| r.is_err()).count();
595 info!(
596 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
597 region_ids.len() - num_failure,
598 provider.as_kafka_provider().unwrap(),
600 num_failure,
601 now.elapsed(),
602 );
603 Ok(region_ids.into_iter().zip(responses).collect())
604 }
605
606 async fn handle_batch_open_requests(
607 &self,
608 parallelism: usize,
609 requests: Vec<(RegionId, RegionOpenRequest)>,
610 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
611 let semaphore = Arc::new(Semaphore::new(parallelism));
612 let (topic_to_region_requests, remaining_region_requests) =
613 prepare_batch_open_requests(requests)?;
614 let mut responses =
615 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
616
617 if !topic_to_region_requests.is_empty() {
618 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
619 for (topic, region_requests) in topic_to_region_requests {
620 let semaphore_moved = semaphore.clone();
621 tasks.push(async move {
622 let _permit = semaphore_moved.acquire().await.unwrap();
624 self.open_topic_regions(topic, region_requests).await
625 })
626 }
627 let r = try_join_all(tasks).await?;
628 responses.extend(r.into_iter().flatten());
629 }
630
631 if !remaining_region_requests.is_empty() {
632 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
633 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
634 for (region_id, request) in remaining_region_requests {
635 let semaphore_moved = semaphore.clone();
636 region_ids.push(region_id);
637 tasks.push(async move {
638 let _permit = semaphore_moved.acquire().await.unwrap();
640 let (request, receiver) =
641 WorkerRequest::new_open_region_request(region_id, request, None);
642
643 self.workers.submit_to_worker(region_id, request).await?;
644
645 receiver.await.context(RecvSnafu)?
646 })
647 }
648
649 let results = join_all(tasks).await;
650 responses.extend(region_ids.into_iter().zip(results));
651 }
652
653 Ok(responses)
654 }
655
656 async fn handle_request(
658 &self,
659 region_id: RegionId,
660 request: RegionRequest,
661 ) -> Result<AffectedRows> {
662 let region_metadata = self.get_metadata(region_id).ok();
663 let (request, receiver) =
664 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
665 self.workers.submit_to_worker(region_id, request).await?;
666
667 receiver.await.context(RecvSnafu)?
668 }
669
670 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
672 self.find_region(region_id)
674 .map(|r| r.find_committed_sequence())
675 }
676
677 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
679 let query_start = Instant::now();
680 let region = self.find_region(region_id)?;
682 let version = region.version();
683 let cache_manager = self.workers.cache_manager();
685
686 let scan_region = ScanRegion::new(
687 version,
688 region.access_layer.clone(),
689 request,
690 CacheStrategy::EnableAll(cache_manager),
691 )
692 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
693 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
694 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
695 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
696 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
697 .with_start_time(query_start)
698 .with_flat_format(self.config.default_experimental_flat_format);
699
700 #[cfg(feature = "enterprise")]
701 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
702
703 Ok(scan_region)
704 }
705
706 #[cfg(feature = "enterprise")]
707 fn maybe_fill_extension_range_provider(
708 &self,
709 mut scan_region: ScanRegion,
710 region: MitoRegionRef,
711 ) -> ScanRegion {
712 if region.is_follower()
713 && let Some(factory) = self.extension_range_provider_factory.as_ref()
714 {
715 scan_region
716 .set_extension_range_provider(factory.create_extension_range_provider(region));
717 }
718 scan_region
719 }
720
721 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
723 let region = self.find_region(region_id)?;
724 region.set_role(role);
725 Ok(())
726 }
727
728 async fn set_region_role_state_gracefully(
730 &self,
731 region_id: RegionId,
732 region_role_state: SettableRegionRoleState,
733 ) -> Result<SetRegionRoleStateResponse> {
734 let (request, receiver) =
737 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
738 self.workers.submit_to_worker(region_id, request).await?;
739
740 receiver.await.context(RecvSnafu)
741 }
742
743 async fn sync_region(
744 &self,
745 region_id: RegionId,
746 manifest_info: RegionManifestInfo,
747 ) -> Result<(ManifestVersion, bool)> {
748 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
749 let manifest_version = manifest_info.data_manifest_version();
750 let (request, receiver) =
751 WorkerRequest::new_sync_region_request(region_id, manifest_version);
752 self.workers.submit_to_worker(region_id, request).await?;
753
754 receiver.await.context(RecvSnafu)?
755 }
756
757 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
758 self.workers.get_region(region_id).map(|region| {
759 if region.is_follower() {
760 RegionRole::Follower
761 } else {
762 RegionRole::Leader
763 }
764 })
765 }
766}
767
768#[async_trait]
769impl RegionEngine for MitoEngine {
770 fn name(&self) -> &str {
771 MITO_ENGINE_NAME
772 }
773
774 #[tracing::instrument(skip_all)]
775 async fn handle_batch_open_requests(
776 &self,
777 parallelism: usize,
778 requests: Vec<(RegionId, RegionOpenRequest)>,
779 ) -> Result<BatchResponses, BoxedError> {
780 self.inner
782 .handle_batch_open_requests(parallelism, requests)
783 .await
784 .map(|responses| {
785 responses
786 .into_iter()
787 .map(|(region_id, response)| {
788 (
789 region_id,
790 response.map(RegionResponse::new).map_err(BoxedError::new),
791 )
792 })
793 .collect::<Vec<_>>()
794 })
795 .map_err(BoxedError::new)
796 }
797
798 #[tracing::instrument(skip_all)]
799 async fn handle_request(
800 &self,
801 region_id: RegionId,
802 request: RegionRequest,
803 ) -> Result<RegionResponse, BoxedError> {
804 let _timer = HANDLE_REQUEST_ELAPSED
805 .with_label_values(&[request.request_type()])
806 .start_timer();
807
808 let is_alter = matches!(request, RegionRequest::Alter(_));
809 let is_create = matches!(request, RegionRequest::Create(_));
810 let mut response = self
811 .inner
812 .handle_request(region_id, request)
813 .await
814 .map(RegionResponse::new)
815 .map_err(BoxedError::new)?;
816
817 if is_alter {
818 self.handle_alter_response(region_id, &mut response)
819 .map_err(BoxedError::new)?;
820 } else if is_create {
821 self.handle_create_response(region_id, &mut response)
822 .map_err(BoxedError::new)?;
823 }
824
825 Ok(response)
826 }
827
828 #[tracing::instrument(skip_all)]
829 async fn handle_query(
830 &self,
831 region_id: RegionId,
832 request: ScanRequest,
833 ) -> Result<RegionScannerRef, BoxedError> {
834 self.scan_region(region_id, request)
835 .map_err(BoxedError::new)?
836 .region_scanner()
837 .await
838 .map_err(BoxedError::new)
839 }
840
841 async fn get_committed_sequence(
842 &self,
843 region_id: RegionId,
844 ) -> Result<SequenceNumber, BoxedError> {
845 self.inner
846 .get_committed_sequence(region_id)
847 .map_err(BoxedError::new)
848 }
849
850 async fn get_metadata(
852 &self,
853 region_id: RegionId,
854 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
855 self.inner.get_metadata(region_id).map_err(BoxedError::new)
856 }
857
858 async fn stop(&self) -> std::result::Result<(), BoxedError> {
864 self.inner.stop().await.map_err(BoxedError::new)
865 }
866
867 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
868 self.get_region_statistic(region_id)
869 }
870
871 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
872 self.inner
873 .set_region_role(region_id, role)
874 .map_err(BoxedError::new)
875 }
876
877 async fn set_region_role_state_gracefully(
878 &self,
879 region_id: RegionId,
880 region_role_state: SettableRegionRoleState,
881 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
882 let _timer = HANDLE_REQUEST_ELAPSED
883 .with_label_values(&["set_region_role_state_gracefully"])
884 .start_timer();
885
886 self.inner
887 .set_region_role_state_gracefully(region_id, region_role_state)
888 .await
889 .map_err(BoxedError::new)
890 }
891
892 async fn sync_region(
893 &self,
894 region_id: RegionId,
895 manifest_info: RegionManifestInfo,
896 ) -> Result<SyncManifestResponse, BoxedError> {
897 let (_, synced) = self
898 .inner
899 .sync_region(region_id, manifest_info)
900 .await
901 .map_err(BoxedError::new)?;
902
903 Ok(SyncManifestResponse::Mito { synced })
904 }
905
906 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
907 self.inner.role(region_id)
908 }
909
910 fn as_any(&self) -> &dyn Any {
911 self
912 }
913}
914
915impl MitoEngine {
916 fn handle_alter_response(
917 &self,
918 region_id: RegionId,
919 response: &mut RegionResponse,
920 ) -> Result<()> {
921 if let Some(statistic) = self.region_statistic(region_id) {
922 Self::encode_manifest_info_to_extensions(
923 ®ion_id,
924 statistic.manifest,
925 &mut response.extensions,
926 )?;
927 }
928 let column_metadatas = self
929 .inner
930 .find_region(region_id)
931 .ok()
932 .map(|r| r.metadata().column_metadatas.clone());
933 if let Some(column_metadatas) = column_metadatas {
934 Self::encode_column_metadatas_to_extensions(
935 ®ion_id,
936 column_metadatas,
937 &mut response.extensions,
938 )?;
939 }
940 Ok(())
941 }
942
943 fn handle_create_response(
944 &self,
945 region_id: RegionId,
946 response: &mut RegionResponse,
947 ) -> Result<()> {
948 let column_metadatas = self
949 .inner
950 .find_region(region_id)
951 .ok()
952 .map(|r| r.metadata().column_metadatas.clone());
953 if let Some(column_metadatas) = column_metadatas {
954 Self::encode_column_metadatas_to_extensions(
955 ®ion_id,
956 column_metadatas,
957 &mut response.extensions,
958 )?;
959 }
960 Ok(())
961 }
962}
963
964#[cfg(any(test, feature = "test"))]
966#[allow(clippy::too_many_arguments)]
967impl MitoEngine {
968 pub async fn new_for_test<S: LogStore>(
970 data_home: &str,
971 mut config: MitoConfig,
972 log_store: Arc<S>,
973 object_store_manager: ObjectStoreManagerRef,
974 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
975 listener: Option<crate::engine::listener::EventListenerRef>,
976 time_provider: crate::time_provider::TimeProviderRef,
977 schema_metadata_manager: SchemaMetadataManagerRef,
978 file_ref_manager: FileReferenceManagerRef,
979 partition_expr_fetcher: PartitionExprFetcherRef,
980 ) -> Result<MitoEngine> {
981 config.sanitize(data_home)?;
982
983 let config = Arc::new(config);
984 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
985 Ok(MitoEngine {
986 inner: Arc::new(EngineInner {
987 workers: WorkerGroup::start_for_test(
988 config.clone(),
989 log_store,
990 object_store_manager,
991 write_buffer_manager,
992 listener,
993 schema_metadata_manager,
994 file_ref_manager,
995 time_provider,
996 partition_expr_fetcher,
997 )
998 .await?,
999 config,
1000 wal_raw_entry_reader,
1001 #[cfg(feature = "enterprise")]
1002 extension_range_provider_factory: None,
1003 }),
1004 })
1005 }
1006
1007 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1009 self.inner.workers.purge_scheduler()
1010 }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use std::time::Duration;
1016
1017 use super::*;
1018 use crate::sst::file::FileMeta;
1019
1020 #[test]
1021 fn test_is_valid_region_edit() {
1022 let edit = RegionEdit {
1024 files_to_add: vec![FileMeta::default()],
1025 files_to_remove: vec![],
1026 timestamp_ms: None,
1027 compaction_time_window: None,
1028 flushed_entry_id: None,
1029 flushed_sequence: None,
1030 committed_sequence: None,
1031 };
1032 assert!(is_valid_region_edit(&edit));
1033
1034 let edit = RegionEdit {
1036 files_to_add: vec![],
1037 files_to_remove: vec![],
1038 timestamp_ms: None,
1039 compaction_time_window: None,
1040 flushed_entry_id: None,
1041 flushed_sequence: None,
1042 committed_sequence: None,
1043 };
1044 assert!(!is_valid_region_edit(&edit));
1045
1046 let edit = RegionEdit {
1048 files_to_add: vec![FileMeta::default()],
1049 files_to_remove: vec![FileMeta::default()],
1050 timestamp_ms: None,
1051 compaction_time_window: None,
1052 flushed_entry_id: None,
1053 flushed_sequence: None,
1054 committed_sequence: None,
1055 };
1056 assert!(!is_valid_region_edit(&edit));
1057
1058 let edit = RegionEdit {
1060 files_to_add: vec![FileMeta::default()],
1061 files_to_remove: vec![],
1062 timestamp_ms: None,
1063 compaction_time_window: Some(Duration::from_secs(1)),
1064 flushed_entry_id: None,
1065 flushed_sequence: None,
1066 committed_sequence: None,
1067 };
1068 assert!(!is_valid_region_edit(&edit));
1069 let edit = RegionEdit {
1070 files_to_add: vec![FileMeta::default()],
1071 files_to_remove: vec![],
1072 timestamp_ms: None,
1073 compaction_time_window: None,
1074 flushed_entry_id: Some(1),
1075 flushed_sequence: None,
1076 committed_sequence: None,
1077 };
1078 assert!(!is_valid_region_edit(&edit));
1079 let edit = RegionEdit {
1080 files_to_add: vec![FileMeta::default()],
1081 files_to_remove: vec![],
1082 timestamp_ms: None,
1083 compaction_time_window: None,
1084 flushed_entry_id: None,
1085 flushed_sequence: Some(1),
1086 committed_sequence: None,
1087 };
1088 assert!(!is_valid_region_edit(&edit));
1089 }
1090}