1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod staging_test;
69#[cfg(test)]
70mod sync_test;
71#[cfg(test)]
72mod truncate_test;
73
74mod puffin_index;
75
76use std::any::Any;
77use std::collections::HashMap;
78use std::sync::Arc;
79use std::time::Instant;
80
81use api::region::RegionResponse;
82use async_trait::async_trait;
83use common_base::Plugins;
84use common_error::ext::BoxedError;
85use common_meta::key::SchemaMetadataManagerRef;
86use common_recordbatch::SendableRecordBatchStream;
87use common_telemetry::{info, tracing, warn};
88use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
89use futures::future::{join_all, try_join_all};
90use futures::stream::{self, Stream, StreamExt};
91use object_store::manager::ObjectStoreManagerRef;
92use snafu::{OptionExt, ResultExt, ensure};
93use store_api::ManifestVersion;
94use store_api::codec::PrimaryKeyEncoding;
95use store_api::logstore::LogStore;
96use store_api::logstore::provider::{KafkaProvider, Provider};
97use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
98use store_api::metric_engine_consts::{
99 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
100};
101use store_api::region_engine::{
102 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
103 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
104};
105use store_api::region_request::{
106 AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
107};
108use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
109use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
110use tokio::sync::{Semaphore, oneshot};
111
112use crate::access_layer::RegionFilePathFactory;
113use crate::cache::{CacheManagerRef, CacheStrategy};
114use crate::config::MitoConfig;
115use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
116use crate::error::{
117 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
118 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
119};
120#[cfg(feature = "enterprise")]
121use crate::extension::BoxedExtensionRangeProviderFactory;
122use crate::gc::GcLimiterRef;
123use crate::manifest::action::RegionEdit;
124use crate::memtable::MemtableStats;
125use crate::metrics::HANDLE_REQUEST_ELAPSED;
126use crate::read::scan_region::{ScanRegion, Scanner};
127use crate::read::stream::ScanBatchStream;
128use crate::region::MitoRegionRef;
129use crate::region::opener::PartitionExprFetcherRef;
130use crate::request::{RegionEditRequest, WorkerRequest};
131use crate::sst::file::{FileMeta, RegionFileId};
132use crate::sst::file_ref::FileReferenceManagerRef;
133use crate::wal::entry_distributor::{
134 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
135};
136use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
137use crate::worker::WorkerGroup;
138
139pub const MITO_ENGINE_NAME: &str = "mito";
140
141pub struct MitoEngineBuilder<'a, S: LogStore> {
142 data_home: &'a str,
143 config: MitoConfig,
144 log_store: Arc<S>,
145 object_store_manager: ObjectStoreManagerRef,
146 schema_metadata_manager: SchemaMetadataManagerRef,
147 file_ref_manager: FileReferenceManagerRef,
148 partition_expr_fetcher: PartitionExprFetcherRef,
149 plugins: Plugins,
150 #[cfg(feature = "enterprise")]
151 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
152}
153
154impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
155 #[allow(clippy::too_many_arguments)]
156 pub fn new(
157 data_home: &'a str,
158 config: MitoConfig,
159 log_store: Arc<S>,
160 object_store_manager: ObjectStoreManagerRef,
161 schema_metadata_manager: SchemaMetadataManagerRef,
162 file_ref_manager: FileReferenceManagerRef,
163 partition_expr_fetcher: PartitionExprFetcherRef,
164 plugins: Plugins,
165 ) -> Self {
166 Self {
167 data_home,
168 config,
169 log_store,
170 object_store_manager,
171 schema_metadata_manager,
172 file_ref_manager,
173 plugins,
174 partition_expr_fetcher,
175 #[cfg(feature = "enterprise")]
176 extension_range_provider_factory: None,
177 }
178 }
179
180 #[cfg(feature = "enterprise")]
181 #[must_use]
182 pub fn with_extension_range_provider_factory(
183 self,
184 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
185 ) -> Self {
186 Self {
187 extension_range_provider_factory,
188 ..self
189 }
190 }
191
192 pub async fn try_build(mut self) -> Result<MitoEngine> {
193 self.config.sanitize(self.data_home)?;
194
195 let config = Arc::new(self.config);
196 let workers = WorkerGroup::start(
197 config.clone(),
198 self.log_store.clone(),
199 self.object_store_manager,
200 self.schema_metadata_manager,
201 self.file_ref_manager,
202 self.partition_expr_fetcher.clone(),
203 self.plugins,
204 )
205 .await?;
206 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
207 let inner = EngineInner {
208 workers,
209 config,
210 wal_raw_entry_reader,
211 #[cfg(feature = "enterprise")]
212 extension_range_provider_factory: None,
213 };
214
215 #[cfg(feature = "enterprise")]
216 let inner =
217 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
218
219 Ok(MitoEngine {
220 inner: Arc::new(inner),
221 })
222 }
223}
224
225#[derive(Clone)]
227pub struct MitoEngine {
228 inner: Arc<EngineInner>,
229}
230
231impl MitoEngine {
232 #[allow(clippy::too_many_arguments)]
234 pub async fn new<S: LogStore>(
235 data_home: &str,
236 config: MitoConfig,
237 log_store: Arc<S>,
238 object_store_manager: ObjectStoreManagerRef,
239 schema_metadata_manager: SchemaMetadataManagerRef,
240 file_ref_manager: FileReferenceManagerRef,
241 partition_expr_fetcher: PartitionExprFetcherRef,
242 plugins: Plugins,
243 ) -> Result<MitoEngine> {
244 let builder = MitoEngineBuilder::new(
245 data_home,
246 config,
247 log_store,
248 object_store_manager,
249 schema_metadata_manager,
250 file_ref_manager,
251 partition_expr_fetcher,
252 plugins,
253 );
254 builder.try_build().await
255 }
256
257 pub fn mito_config(&self) -> &MitoConfig {
258 &self.inner.config
259 }
260
261 pub fn cache_manager(&self) -> CacheManagerRef {
262 self.inner.workers.cache_manager()
263 }
264
265 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
266 self.inner.workers.file_ref_manager()
267 }
268
269 pub fn gc_limiter(&self) -> GcLimiterRef {
270 self.inner.workers.gc_limiter()
271 }
272
273 pub async fn get_snapshot_of_unmanifested_refs(
275 &self,
276 region_ids: impl IntoIterator<Item = RegionId>,
277 ) -> Result<FileRefsManifest> {
278 let file_ref_mgr = self.file_ref_manager();
279
280 let region_ids = region_ids.into_iter().collect::<Vec<_>>();
281
282 let regions: Vec<MitoRegionRef> = region_ids
284 .into_iter()
285 .map(|region_id| {
286 self.find_region(region_id)
287 .with_context(|| RegionNotFoundSnafu { region_id })
288 })
289 .collect::<Result<_>>()?;
290
291 file_ref_mgr
292 .get_snapshot_of_unmanifested_refs(regions)
293 .await
294 }
295
296 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
298 self.inner.workers.is_region_exists(region_id)
299 }
300
301 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
303 self.inner.workers.is_region_opening(region_id)
304 }
305
306 pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
308 self.inner.workers.is_region_catching_up(region_id)
309 }
310
311 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
313 self.find_region(region_id)
314 .map(|region| region.region_statistic())
315 }
316
317 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
319 self.find_region(region_id)
320 .map(|r| r.primary_key_encoding())
321 }
322
323 #[tracing::instrument(skip_all)]
328 pub async fn scan_to_stream(
329 &self,
330 region_id: RegionId,
331 request: ScanRequest,
332 ) -> Result<SendableRecordBatchStream, BoxedError> {
333 self.scanner(region_id, request)
334 .await
335 .map_err(BoxedError::new)?
336 .scan()
337 .await
338 }
339
340 pub async fn scan_batch(
342 &self,
343 region_id: RegionId,
344 request: ScanRequest,
345 filter_deleted: bool,
346 ) -> Result<ScanBatchStream> {
347 let mut scan_region = self.scan_region(region_id, request)?;
348 scan_region.set_filter_deleted(filter_deleted);
349 scan_region.scanner().await?.scan_batch()
350 }
351
352 async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
354 self.scan_region(region_id, request)?.scanner().await
355 }
356
357 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
359 self.inner.scan_region(region_id, request)
360 }
361
362 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
367 let _timer = HANDLE_REQUEST_ELAPSED
368 .with_label_values(&["edit_region"])
369 .start_timer();
370
371 ensure!(
372 is_valid_region_edit(&edit),
373 InvalidRequestSnafu {
374 region_id,
375 reason: "invalid region edit"
376 }
377 );
378
379 let (tx, rx) = oneshot::channel();
380 let request = WorkerRequest::EditRegion(RegionEditRequest {
381 region_id,
382 edit,
383 tx,
384 });
385 self.inner
386 .workers
387 .submit_to_worker(region_id, request)
388 .await?;
389 rx.await.context(RecvSnafu)?
390 }
391
392 #[cfg(test)]
393 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
394 self.find_region(id)
395 }
396
397 pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
398 self.inner.workers.get_region(region_id)
399 }
400
401 fn encode_manifest_info_to_extensions(
402 region_id: &RegionId,
403 manifest_info: RegionManifestInfo,
404 extensions: &mut HashMap<String, Vec<u8>>,
405 ) -> Result<()> {
406 let region_manifest_info = vec![(*region_id, manifest_info)];
407
408 extensions.insert(
409 MANIFEST_INFO_EXTENSION_KEY.to_string(),
410 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
411 );
412 info!(
413 "Added manifest info: {:?} to extensions, region_id: {:?}",
414 region_manifest_info, region_id
415 );
416 Ok(())
417 }
418
419 fn encode_column_metadatas_to_extensions(
420 region_id: &RegionId,
421 column_metadatas: Vec<ColumnMetadata>,
422 extensions: &mut HashMap<String, Vec<u8>>,
423 ) -> Result<()> {
424 extensions.insert(
425 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
426 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
427 );
428 info!(
429 "Added column metadatas: {:?} to extensions, region_id: {:?}",
430 column_metadatas, region_id
431 );
432 Ok(())
433 }
434
435 pub fn find_memtable_and_sst_stats(
438 &self,
439 region_id: RegionId,
440 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
441 let region = self
442 .find_region(region_id)
443 .context(RegionNotFoundSnafu { region_id })?;
444
445 let version = region.version();
446 let memtable_stats = version
447 .memtables
448 .list_memtables()
449 .iter()
450 .map(|x| x.stats())
451 .collect::<Vec<_>>();
452
453 let sst_stats = version
454 .ssts
455 .levels()
456 .iter()
457 .flat_map(|level| level.files().map(|x| x.meta_ref()))
458 .cloned()
459 .collect::<Vec<_>>();
460 Ok((memtable_stats, sst_stats))
461 }
462
463 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
465 let node_id = self.inner.workers.file_ref_manager().node_id();
466 let regions = self.inner.workers.all_regions();
467
468 let mut results = Vec::new();
469 for region in regions {
470 let mut entries = region.manifest_sst_entries().await;
471 for e in &mut entries {
472 e.node_id = node_id;
473 }
474 results.extend(entries);
475 }
476
477 results
478 }
479
480 pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
482 let node_id = self.inner.workers.file_ref_manager().node_id();
483 let cache_manager = self.inner.workers.cache_manager();
484 let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
485 let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
486 let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
487
488 let mut results = Vec::new();
489
490 for region in self.inner.workers.all_regions() {
491 let manifest_entries = region.manifest_sst_entries().await;
492 let access_layer = region.access_layer.clone();
493 let table_dir = access_layer.table_dir().to_string();
494 let path_type = access_layer.path_type();
495 let object_store = access_layer.object_store().clone();
496 let puffin_factory = access_layer.puffin_manager_factory().clone();
497 let path_factory = RegionFilePathFactory::new(table_dir, path_type);
498
499 let entry_futures = manifest_entries.into_iter().map(|entry| {
500 let object_store = object_store.clone();
501 let path_factory = path_factory.clone();
502 let puffin_factory = puffin_factory.clone();
503 let puffin_metadata_cache = puffin_metadata_cache.clone();
504 let bloom_filter_cache = bloom_filter_cache.clone();
505 let inverted_index_cache = inverted_index_cache.clone();
506
507 async move {
508 let Some(index_file_path) = entry.index_file_path.as_ref() else {
509 return Vec::new();
510 };
511
512 let Some(index_file_id) = entry.index_file_id.as_ref() else {
513 return Vec::new();
514 };
515 let file_id = match FileId::parse_str(index_file_id) {
516 Ok(file_id) => file_id,
517 Err(err) => {
518 warn!(
519 err;
520 "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
521 entry.table_dir,
522 index_file_id
523 );
524 return Vec::new();
525 }
526 };
527 let region_file_id = RegionFileId::new(entry.region_id, file_id);
528 let context = IndexEntryContext {
529 table_dir: &entry.table_dir,
530 index_file_path: index_file_path.as_str(),
531 region_id: entry.region_id,
532 table_id: entry.table_id,
533 region_number: entry.region_number,
534 region_group: entry.region_group,
535 region_sequence: entry.region_sequence,
536 file_id: index_file_id,
537 index_file_size: entry.index_file_size,
538 node_id,
539 };
540
541 let manager = puffin_factory
542 .build(object_store, path_factory)
543 .with_puffin_metadata_cache(puffin_metadata_cache);
544
545 collect_index_entries_from_puffin(
546 manager,
547 region_file_id,
548 context,
549 bloom_filter_cache,
550 inverted_index_cache,
551 )
552 .await
553 }
554 });
555
556 let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); while let Some(mut metas) = meta_stream.next().await {
558 results.append(&mut metas);
559 }
560 }
561
562 results
563 }
564
565 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
567 let node_id = self.inner.workers.file_ref_manager().node_id();
568 let regions = self.inner.workers.all_regions();
569
570 let mut layers_distinct_table_dirs = HashMap::new();
571 for region in regions {
572 let table_dir = region.access_layer.table_dir();
573 if !layers_distinct_table_dirs.contains_key(table_dir) {
574 layers_distinct_table_dirs
575 .insert(table_dir.to_string(), region.access_layer.clone());
576 }
577 }
578
579 stream::iter(layers_distinct_table_dirs)
580 .map(|(_, access_layer)| access_layer.storage_sst_entries())
581 .flatten()
582 .map(move |entry| {
583 entry.map(move |mut entry| {
584 entry.node_id = node_id;
585 entry
586 })
587 })
588 }
589}
590
591fn is_valid_region_edit(edit: &RegionEdit) -> bool {
593 !edit.files_to_add.is_empty()
594 && edit.files_to_remove.is_empty()
595 && matches!(
596 edit,
597 RegionEdit {
598 files_to_add: _,
599 files_to_remove: _,
600 timestamp_ms: _,
601 compaction_time_window: None,
602 flushed_entry_id: None,
603 flushed_sequence: None,
604 ..
605 }
606 )
607}
608
609struct EngineInner {
611 workers: WorkerGroup,
613 config: Arc<MitoConfig>,
615 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
617 #[cfg(feature = "enterprise")]
618 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
619}
620
621type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
622
623fn prepare_batch_open_requests(
625 requests: Vec<(RegionId, RegionOpenRequest)>,
626) -> Result<(
627 TopicGroupedRegionOpenRequests,
628 Vec<(RegionId, RegionOpenRequest)>,
629)> {
630 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
631 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
632 for (region_id, request) in requests {
633 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
634 serde_json::from_str(options).context(SerdeJsonSnafu)?
635 } else {
636 WalOptions::RaftEngine
637 };
638 match options {
639 WalOptions::Kafka(options) => {
640 topic_to_regions
641 .entry(options.topic)
642 .or_default()
643 .push((region_id, request));
644 }
645 WalOptions::RaftEngine | WalOptions::Noop => {
646 remaining_regions.push((region_id, request));
647 }
648 }
649 }
650
651 Ok((topic_to_regions, remaining_regions))
652}
653
654impl EngineInner {
655 #[cfg(feature = "enterprise")]
656 #[must_use]
657 fn with_extension_range_provider_factory(
658 self,
659 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
660 ) -> Self {
661 Self {
662 extension_range_provider_factory,
663 ..self
664 }
665 }
666
667 async fn stop(&self) -> Result<()> {
669 self.workers.stop().await
670 }
671
672 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
673 self.workers
674 .get_region(region_id)
675 .context(RegionNotFoundSnafu { region_id })
676 }
677
678 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
682 let region = self.find_region(region_id)?;
684 Ok(region.metadata())
685 }
686
687 async fn open_topic_regions(
688 &self,
689 topic: String,
690 region_requests: Vec<(RegionId, RegionOpenRequest)>,
691 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
692 let now = Instant::now();
693 let region_ids = region_requests
694 .iter()
695 .map(|(region_id, _)| *region_id)
696 .collect::<Vec<_>>();
697 let provider = Provider::kafka_provider(topic);
698 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
699 provider.clone(),
700 self.wal_raw_entry_reader.clone(),
701 ®ion_ids,
702 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
703 );
704
705 let mut responses = Vec::with_capacity(region_requests.len());
706 for ((region_id, request), entry_receiver) in
707 region_requests.into_iter().zip(entry_receivers)
708 {
709 let (request, receiver) =
710 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
711 self.workers.submit_to_worker(region_id, request).await?;
712 responses.push(async move { receiver.await.context(RecvSnafu)? });
713 }
714
715 let distribution =
717 common_runtime::spawn_global(async move { distributor.distribute().await });
718 let responses = join_all(responses).await;
720 distribution.await.context(JoinSnafu)??;
721
722 let num_failure = responses.iter().filter(|r| r.is_err()).count();
723 info!(
724 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
725 region_ids.len() - num_failure,
726 provider.as_kafka_provider().unwrap(),
728 num_failure,
729 now.elapsed(),
730 );
731 Ok(region_ids.into_iter().zip(responses).collect())
732 }
733
734 async fn handle_batch_open_requests(
735 &self,
736 parallelism: usize,
737 requests: Vec<(RegionId, RegionOpenRequest)>,
738 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
739 let semaphore = Arc::new(Semaphore::new(parallelism));
740 let (topic_to_region_requests, remaining_region_requests) =
741 prepare_batch_open_requests(requests)?;
742 let mut responses =
743 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
744
745 if !topic_to_region_requests.is_empty() {
746 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
747 for (topic, region_requests) in topic_to_region_requests {
748 let semaphore_moved = semaphore.clone();
749 tasks.push(async move {
750 let _permit = semaphore_moved.acquire().await.unwrap();
752 self.open_topic_regions(topic, region_requests).await
753 })
754 }
755 let r = try_join_all(tasks).await?;
756 responses.extend(r.into_iter().flatten());
757 }
758
759 if !remaining_region_requests.is_empty() {
760 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
761 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
762 for (region_id, request) in remaining_region_requests {
763 let semaphore_moved = semaphore.clone();
764 region_ids.push(region_id);
765 tasks.push(async move {
766 let _permit = semaphore_moved.acquire().await.unwrap();
768 let (request, receiver) =
769 WorkerRequest::new_open_region_request(region_id, request, None);
770
771 self.workers.submit_to_worker(region_id, request).await?;
772
773 receiver.await.context(RecvSnafu)?
774 })
775 }
776
777 let results = join_all(tasks).await;
778 responses.extend(region_ids.into_iter().zip(results));
779 }
780
781 Ok(responses)
782 }
783
784 async fn catchup_topic_regions(
785 &self,
786 provider: Provider,
787 region_requests: Vec<(RegionId, RegionCatchupRequest)>,
788 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
789 let now = Instant::now();
790 let region_ids = region_requests
791 .iter()
792 .map(|(region_id, _)| *region_id)
793 .collect::<Vec<_>>();
794 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
795 provider.clone(),
796 self.wal_raw_entry_reader.clone(),
797 ®ion_ids,
798 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
799 );
800
801 let mut responses = Vec::with_capacity(region_requests.len());
802 for ((region_id, request), entry_receiver) in
803 region_requests.into_iter().zip(entry_receivers)
804 {
805 let (request, receiver) =
806 WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
807 self.workers.submit_to_worker(region_id, request).await?;
808 responses.push(async move { receiver.await.context(RecvSnafu)? });
809 }
810
811 let distribution =
813 common_runtime::spawn_global(async move { distributor.distribute().await });
814 let responses = join_all(responses).await;
816 distribution.await.context(JoinSnafu)??;
817
818 let num_failure = responses.iter().filter(|r| r.is_err()).count();
819 info!(
820 "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
821 region_ids.len() - num_failure,
822 provider.as_kafka_provider().unwrap(),
824 num_failure,
825 now.elapsed(),
826 );
827
828 Ok(region_ids.into_iter().zip(responses).collect())
829 }
830
831 async fn handle_batch_catchup_requests(
832 &self,
833 parallelism: usize,
834 requests: Vec<(RegionId, RegionCatchupRequest)>,
835 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
836 let mut responses = Vec::with_capacity(requests.len());
837 let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
838 let mut remaining_region_requests = vec![];
839
840 for (region_id, request) in requests {
841 match self.workers.get_region(region_id) {
842 Some(region) => match region.provider.as_kafka_provider() {
843 Some(provider) => {
844 topic_regions
845 .entry(provider.clone())
846 .or_default()
847 .push((region_id, request));
848 }
849 None => {
850 remaining_region_requests.push((region_id, request));
851 }
852 },
853 None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
854 }
855 }
856
857 let semaphore = Arc::new(Semaphore::new(parallelism));
858
859 if !topic_regions.is_empty() {
860 let mut tasks = Vec::with_capacity(topic_regions.len());
861 for (provider, region_requests) in topic_regions {
862 let semaphore_moved = semaphore.clone();
863 tasks.push(async move {
864 let _permit = semaphore_moved.acquire().await.unwrap();
866 self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
867 .await
868 })
869 }
870
871 let r = try_join_all(tasks).await?;
872 responses.extend(r.into_iter().flatten());
873 }
874
875 if !remaining_region_requests.is_empty() {
876 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
877 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
878 for (region_id, request) in remaining_region_requests {
879 let semaphore_moved = semaphore.clone();
880 region_ids.push(region_id);
881 tasks.push(async move {
882 let _permit = semaphore_moved.acquire().await.unwrap();
884 let (request, receiver) =
885 WorkerRequest::new_catchup_region_request(region_id, request, None);
886
887 self.workers.submit_to_worker(region_id, request).await?;
888
889 receiver.await.context(RecvSnafu)?
890 })
891 }
892
893 let results = join_all(tasks).await;
894 responses.extend(region_ids.into_iter().zip(results));
895 }
896
897 Ok(responses)
898 }
899
900 async fn handle_request(
902 &self,
903 region_id: RegionId,
904 request: RegionRequest,
905 ) -> Result<AffectedRows> {
906 let region_metadata = self.get_metadata(region_id).ok();
907 let (request, receiver) =
908 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
909 self.workers.submit_to_worker(region_id, request).await?;
910
911 receiver.await.context(RecvSnafu)?
912 }
913
914 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
916 self.find_region(region_id)
918 .map(|r| r.find_committed_sequence())
919 }
920
921 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
923 let query_start = Instant::now();
924 let region = self.find_region(region_id)?;
926 let version = region.version();
927 let cache_manager = self.workers.cache_manager();
929
930 let scan_region = ScanRegion::new(
931 version,
932 region.access_layer.clone(),
933 request,
934 CacheStrategy::EnableAll(cache_manager),
935 )
936 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
937 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
938 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
939 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
940 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
941 .with_start_time(query_start)
942 .with_flat_format(self.config.default_experimental_flat_format);
943
944 #[cfg(feature = "enterprise")]
945 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
946
947 Ok(scan_region)
948 }
949
950 #[cfg(feature = "enterprise")]
951 fn maybe_fill_extension_range_provider(
952 &self,
953 mut scan_region: ScanRegion,
954 region: MitoRegionRef,
955 ) -> ScanRegion {
956 if region.is_follower()
957 && let Some(factory) = self.extension_range_provider_factory.as_ref()
958 {
959 scan_region
960 .set_extension_range_provider(factory.create_extension_range_provider(region));
961 }
962 scan_region
963 }
964
965 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
967 let region = self.find_region(region_id)?;
968 region.set_role(role);
969 Ok(())
970 }
971
972 async fn set_region_role_state_gracefully(
974 &self,
975 region_id: RegionId,
976 region_role_state: SettableRegionRoleState,
977 ) -> Result<SetRegionRoleStateResponse> {
978 let (request, receiver) =
981 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
982 self.workers.submit_to_worker(region_id, request).await?;
983
984 receiver.await.context(RecvSnafu)
985 }
986
987 async fn sync_region(
988 &self,
989 region_id: RegionId,
990 manifest_info: RegionManifestInfo,
991 ) -> Result<(ManifestVersion, bool)> {
992 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
993 let manifest_version = manifest_info.data_manifest_version();
994 let (request, receiver) =
995 WorkerRequest::new_sync_region_request(region_id, manifest_version);
996 self.workers.submit_to_worker(region_id, request).await?;
997
998 receiver.await.context(RecvSnafu)?
999 }
1000
1001 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1002 self.workers.get_region(region_id).map(|region| {
1003 if region.is_follower() {
1004 RegionRole::Follower
1005 } else {
1006 RegionRole::Leader
1007 }
1008 })
1009 }
1010}
1011
1012#[async_trait]
1013impl RegionEngine for MitoEngine {
1014 fn name(&self) -> &str {
1015 MITO_ENGINE_NAME
1016 }
1017
1018 #[tracing::instrument(skip_all)]
1019 async fn handle_batch_open_requests(
1020 &self,
1021 parallelism: usize,
1022 requests: Vec<(RegionId, RegionOpenRequest)>,
1023 ) -> Result<BatchResponses, BoxedError> {
1024 self.inner
1026 .handle_batch_open_requests(parallelism, requests)
1027 .await
1028 .map(|responses| {
1029 responses
1030 .into_iter()
1031 .map(|(region_id, response)| {
1032 (
1033 region_id,
1034 response.map(RegionResponse::new).map_err(BoxedError::new),
1035 )
1036 })
1037 .collect::<Vec<_>>()
1038 })
1039 .map_err(BoxedError::new)
1040 }
1041
1042 #[tracing::instrument(skip_all)]
1043 async fn handle_batch_catchup_requests(
1044 &self,
1045 parallelism: usize,
1046 requests: Vec<(RegionId, RegionCatchupRequest)>,
1047 ) -> Result<BatchResponses, BoxedError> {
1048 self.inner
1049 .handle_batch_catchup_requests(parallelism, requests)
1050 .await
1051 .map(|responses| {
1052 responses
1053 .into_iter()
1054 .map(|(region_id, response)| {
1055 (
1056 region_id,
1057 response.map(RegionResponse::new).map_err(BoxedError::new),
1058 )
1059 })
1060 .collect::<Vec<_>>()
1061 })
1062 .map_err(BoxedError::new)
1063 }
1064
1065 #[tracing::instrument(skip_all)]
1066 async fn handle_request(
1067 &self,
1068 region_id: RegionId,
1069 request: RegionRequest,
1070 ) -> Result<RegionResponse, BoxedError> {
1071 let _timer = HANDLE_REQUEST_ELAPSED
1072 .with_label_values(&[request.request_type()])
1073 .start_timer();
1074
1075 let is_alter = matches!(request, RegionRequest::Alter(_));
1076 let is_create = matches!(request, RegionRequest::Create(_));
1077 let mut response = self
1078 .inner
1079 .handle_request(region_id, request)
1080 .await
1081 .map(RegionResponse::new)
1082 .map_err(BoxedError::new)?;
1083
1084 if is_alter {
1085 self.handle_alter_response(region_id, &mut response)
1086 .map_err(BoxedError::new)?;
1087 } else if is_create {
1088 self.handle_create_response(region_id, &mut response)
1089 .map_err(BoxedError::new)?;
1090 }
1091
1092 Ok(response)
1093 }
1094
1095 #[tracing::instrument(skip_all)]
1096 async fn handle_query(
1097 &self,
1098 region_id: RegionId,
1099 request: ScanRequest,
1100 ) -> Result<RegionScannerRef, BoxedError> {
1101 self.scan_region(region_id, request)
1102 .map_err(BoxedError::new)?
1103 .region_scanner()
1104 .await
1105 .map_err(BoxedError::new)
1106 }
1107
1108 async fn get_committed_sequence(
1109 &self,
1110 region_id: RegionId,
1111 ) -> Result<SequenceNumber, BoxedError> {
1112 self.inner
1113 .get_committed_sequence(region_id)
1114 .map_err(BoxedError::new)
1115 }
1116
1117 async fn get_metadata(
1119 &self,
1120 region_id: RegionId,
1121 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1122 self.inner.get_metadata(region_id).map_err(BoxedError::new)
1123 }
1124
1125 async fn stop(&self) -> std::result::Result<(), BoxedError> {
1131 self.inner.stop().await.map_err(BoxedError::new)
1132 }
1133
1134 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1135 self.get_region_statistic(region_id)
1136 }
1137
1138 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1139 self.inner
1140 .set_region_role(region_id, role)
1141 .map_err(BoxedError::new)
1142 }
1143
1144 async fn set_region_role_state_gracefully(
1145 &self,
1146 region_id: RegionId,
1147 region_role_state: SettableRegionRoleState,
1148 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1149 let _timer = HANDLE_REQUEST_ELAPSED
1150 .with_label_values(&["set_region_role_state_gracefully"])
1151 .start_timer();
1152
1153 self.inner
1154 .set_region_role_state_gracefully(region_id, region_role_state)
1155 .await
1156 .map_err(BoxedError::new)
1157 }
1158
1159 async fn sync_region(
1160 &self,
1161 region_id: RegionId,
1162 manifest_info: RegionManifestInfo,
1163 ) -> Result<SyncManifestResponse, BoxedError> {
1164 let (_, synced) = self
1165 .inner
1166 .sync_region(region_id, manifest_info)
1167 .await
1168 .map_err(BoxedError::new)?;
1169
1170 Ok(SyncManifestResponse::Mito { synced })
1171 }
1172
1173 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1174 self.inner.role(region_id)
1175 }
1176
1177 fn as_any(&self) -> &dyn Any {
1178 self
1179 }
1180}
1181
1182impl MitoEngine {
1183 fn handle_alter_response(
1184 &self,
1185 region_id: RegionId,
1186 response: &mut RegionResponse,
1187 ) -> Result<()> {
1188 if let Some(statistic) = self.region_statistic(region_id) {
1189 Self::encode_manifest_info_to_extensions(
1190 ®ion_id,
1191 statistic.manifest,
1192 &mut response.extensions,
1193 )?;
1194 }
1195 let column_metadatas = self
1196 .inner
1197 .find_region(region_id)
1198 .ok()
1199 .map(|r| r.metadata().column_metadatas.clone());
1200 if let Some(column_metadatas) = column_metadatas {
1201 Self::encode_column_metadatas_to_extensions(
1202 ®ion_id,
1203 column_metadatas,
1204 &mut response.extensions,
1205 )?;
1206 }
1207 Ok(())
1208 }
1209
1210 fn handle_create_response(
1211 &self,
1212 region_id: RegionId,
1213 response: &mut RegionResponse,
1214 ) -> Result<()> {
1215 let column_metadatas = self
1216 .inner
1217 .find_region(region_id)
1218 .ok()
1219 .map(|r| r.metadata().column_metadatas.clone());
1220 if let Some(column_metadatas) = column_metadatas {
1221 Self::encode_column_metadatas_to_extensions(
1222 ®ion_id,
1223 column_metadatas,
1224 &mut response.extensions,
1225 )?;
1226 }
1227 Ok(())
1228 }
1229}
1230
1231#[cfg(any(test, feature = "test"))]
1233#[allow(clippy::too_many_arguments)]
1234impl MitoEngine {
1235 pub async fn new_for_test<S: LogStore>(
1237 data_home: &str,
1238 mut config: MitoConfig,
1239 log_store: Arc<S>,
1240 object_store_manager: ObjectStoreManagerRef,
1241 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1242 listener: Option<crate::engine::listener::EventListenerRef>,
1243 time_provider: crate::time_provider::TimeProviderRef,
1244 schema_metadata_manager: SchemaMetadataManagerRef,
1245 file_ref_manager: FileReferenceManagerRef,
1246 partition_expr_fetcher: PartitionExprFetcherRef,
1247 ) -> Result<MitoEngine> {
1248 config.sanitize(data_home)?;
1249
1250 let config = Arc::new(config);
1251 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1252 Ok(MitoEngine {
1253 inner: Arc::new(EngineInner {
1254 workers: WorkerGroup::start_for_test(
1255 config.clone(),
1256 log_store,
1257 object_store_manager,
1258 write_buffer_manager,
1259 listener,
1260 schema_metadata_manager,
1261 file_ref_manager,
1262 time_provider,
1263 partition_expr_fetcher,
1264 )
1265 .await?,
1266 config,
1267 wal_raw_entry_reader,
1268 #[cfg(feature = "enterprise")]
1269 extension_range_provider_factory: None,
1270 }),
1271 })
1272 }
1273
1274 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1276 self.inner.workers.purge_scheduler()
1277 }
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282 use std::time::Duration;
1283
1284 use super::*;
1285 use crate::sst::file::FileMeta;
1286
1287 #[test]
1288 fn test_is_valid_region_edit() {
1289 let edit = RegionEdit {
1291 files_to_add: vec![FileMeta::default()],
1292 files_to_remove: vec![],
1293 timestamp_ms: None,
1294 compaction_time_window: None,
1295 flushed_entry_id: None,
1296 flushed_sequence: None,
1297 committed_sequence: None,
1298 };
1299 assert!(is_valid_region_edit(&edit));
1300
1301 let edit = RegionEdit {
1303 files_to_add: vec![],
1304 files_to_remove: vec![],
1305 timestamp_ms: None,
1306 compaction_time_window: None,
1307 flushed_entry_id: None,
1308 flushed_sequence: None,
1309 committed_sequence: None,
1310 };
1311 assert!(!is_valid_region_edit(&edit));
1312
1313 let edit = RegionEdit {
1315 files_to_add: vec![FileMeta::default()],
1316 files_to_remove: vec![FileMeta::default()],
1317 timestamp_ms: None,
1318 compaction_time_window: None,
1319 flushed_entry_id: None,
1320 flushed_sequence: None,
1321 committed_sequence: None,
1322 };
1323 assert!(!is_valid_region_edit(&edit));
1324
1325 let edit = RegionEdit {
1327 files_to_add: vec![FileMeta::default()],
1328 files_to_remove: vec![],
1329 timestamp_ms: None,
1330 compaction_time_window: Some(Duration::from_secs(1)),
1331 flushed_entry_id: None,
1332 flushed_sequence: None,
1333 committed_sequence: None,
1334 };
1335 assert!(!is_valid_region_edit(&edit));
1336 let edit = RegionEdit {
1337 files_to_add: vec![FileMeta::default()],
1338 files_to_remove: vec![],
1339 timestamp_ms: None,
1340 compaction_time_window: None,
1341 flushed_entry_id: Some(1),
1342 flushed_sequence: None,
1343 committed_sequence: None,
1344 };
1345 assert!(!is_valid_region_edit(&edit));
1346 let edit = RegionEdit {
1347 files_to_add: vec![FileMeta::default()],
1348 files_to_remove: vec![],
1349 timestamp_ms: None,
1350 compaction_time_window: None,
1351 flushed_entry_id: None,
1352 flushed_sequence: Some(1),
1353 committed_sequence: None,
1354 };
1355 assert!(!is_valid_region_edit(&edit));
1356 }
1357}