1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod staging_test;
69#[cfg(test)]
70mod sync_test;
71#[cfg(test)]
72mod truncate_test;
73
74#[cfg(test)]
75mod remap_manifests_test;
76
77mod puffin_index;
78
79use std::any::Any;
80use std::collections::HashMap;
81use std::sync::Arc;
82use std::time::Instant;
83
84use api::region::RegionResponse;
85use async_trait::async_trait;
86use common_base::Plugins;
87use common_error::ext::BoxedError;
88use common_meta::key::SchemaMetadataManagerRef;
89use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
90use common_stat::get_total_memory_bytes;
91use common_telemetry::{info, tracing, warn};
92use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
93use futures::future::{join_all, try_join_all};
94use futures::stream::{self, Stream, StreamExt};
95use object_store::manager::ObjectStoreManagerRef;
96use snafu::{OptionExt, ResultExt, ensure};
97use store_api::ManifestVersion;
98use store_api::codec::PrimaryKeyEncoding;
99use store_api::logstore::LogStore;
100use store_api::logstore::provider::{KafkaProvider, Provider};
101use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
102use store_api::metric_engine_consts::{
103 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
104};
105use store_api::region_engine::{
106 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
107 RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
108 SettableRegionRoleState, SyncManifestResponse,
109};
110use store_api::region_request::{
111 AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
112};
113use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
114use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
115use tokio::sync::{Semaphore, oneshot};
116
117use crate::access_layer::RegionFilePathFactory;
118use crate::cache::{CacheManagerRef, CacheStrategy};
119use crate::config::MitoConfig;
120use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
121use crate::error::{
122 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
123 SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
124};
125#[cfg(feature = "enterprise")]
126use crate::extension::BoxedExtensionRangeProviderFactory;
127use crate::gc::GcLimiterRef;
128use crate::manifest::action::RegionEdit;
129use crate::memtable::MemtableStats;
130use crate::metrics::{
131 HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
132};
133use crate::read::scan_region::{ScanRegion, Scanner};
134use crate::read::stream::ScanBatchStream;
135use crate::region::MitoRegionRef;
136use crate::region::opener::PartitionExprFetcherRef;
137use crate::request::{RegionEditRequest, WorkerRequest};
138use crate::sst::file::{FileMeta, RegionFileId, RegionIndexId};
139use crate::sst::file_ref::FileReferenceManagerRef;
140use crate::wal::entry_distributor::{
141 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
142};
143use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
144use crate::worker::WorkerGroup;
145
146pub const MITO_ENGINE_NAME: &str = "mito";
147
148pub struct MitoEngineBuilder<'a, S: LogStore> {
149 data_home: &'a str,
150 config: MitoConfig,
151 log_store: Arc<S>,
152 object_store_manager: ObjectStoreManagerRef,
153 schema_metadata_manager: SchemaMetadataManagerRef,
154 file_ref_manager: FileReferenceManagerRef,
155 partition_expr_fetcher: PartitionExprFetcherRef,
156 plugins: Plugins,
157 max_concurrent_queries: usize,
158 #[cfg(feature = "enterprise")]
159 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
160}
161
162impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
163 #[allow(clippy::too_many_arguments)]
164 pub fn new(
165 data_home: &'a str,
166 config: MitoConfig,
167 log_store: Arc<S>,
168 object_store_manager: ObjectStoreManagerRef,
169 schema_metadata_manager: SchemaMetadataManagerRef,
170 file_ref_manager: FileReferenceManagerRef,
171 partition_expr_fetcher: PartitionExprFetcherRef,
172 plugins: Plugins,
173 max_concurrent_queries: usize,
174 ) -> Self {
175 Self {
176 data_home,
177 config,
178 log_store,
179 object_store_manager,
180 schema_metadata_manager,
181 file_ref_manager,
182 plugins,
183 partition_expr_fetcher,
184 max_concurrent_queries,
185 #[cfg(feature = "enterprise")]
186 extension_range_provider_factory: None,
187 }
188 }
189
190 #[cfg(feature = "enterprise")]
191 #[must_use]
192 pub fn with_extension_range_provider_factory(
193 self,
194 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
195 ) -> Self {
196 Self {
197 extension_range_provider_factory,
198 ..self
199 }
200 }
201
202 pub async fn try_build(mut self) -> Result<MitoEngine> {
203 self.config.sanitize(self.data_home)?;
204
205 let config = Arc::new(self.config);
206 let workers = WorkerGroup::start(
207 config.clone(),
208 self.log_store.clone(),
209 self.object_store_manager,
210 self.schema_metadata_manager,
211 self.file_ref_manager,
212 self.partition_expr_fetcher.clone(),
213 self.plugins,
214 )
215 .await?;
216 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
217 let total_memory = get_total_memory_bytes().max(0) as u64;
218 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
219 let scan_memory_tracker =
220 QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
221 .with_on_update(|usage| {
222 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
223 })
224 .with_on_reject(|| {
225 SCAN_REQUESTS_REJECTED_TOTAL.inc();
226 });
227
228 let inner = EngineInner {
229 workers,
230 config,
231 wal_raw_entry_reader,
232 scan_memory_tracker,
233 #[cfg(feature = "enterprise")]
234 extension_range_provider_factory: None,
235 };
236
237 #[cfg(feature = "enterprise")]
238 let inner =
239 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
240
241 Ok(MitoEngine {
242 inner: Arc::new(inner),
243 })
244 }
245}
246
247#[derive(Clone)]
249pub struct MitoEngine {
250 inner: Arc<EngineInner>,
251}
252
253impl MitoEngine {
254 #[allow(clippy::too_many_arguments)]
256 pub async fn new<S: LogStore>(
257 data_home: &str,
258 config: MitoConfig,
259 log_store: Arc<S>,
260 object_store_manager: ObjectStoreManagerRef,
261 schema_metadata_manager: SchemaMetadataManagerRef,
262 file_ref_manager: FileReferenceManagerRef,
263 partition_expr_fetcher: PartitionExprFetcherRef,
264 plugins: Plugins,
265 ) -> Result<MitoEngine> {
266 let builder = MitoEngineBuilder::new(
267 data_home,
268 config,
269 log_store,
270 object_store_manager,
271 schema_metadata_manager,
272 file_ref_manager,
273 partition_expr_fetcher,
274 plugins,
275 0, );
277 builder.try_build().await
278 }
279
280 pub fn mito_config(&self) -> &MitoConfig {
281 &self.inner.config
282 }
283
284 pub fn cache_manager(&self) -> CacheManagerRef {
285 self.inner.workers.cache_manager()
286 }
287
288 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
289 self.inner.workers.file_ref_manager()
290 }
291
292 pub fn gc_limiter(&self) -> GcLimiterRef {
293 self.inner.workers.gc_limiter()
294 }
295
296 pub async fn get_snapshot_of_file_refs(
298 &self,
299 file_handle_regions: impl IntoIterator<Item = RegionId>,
300 manifest_regions: HashMap<RegionId, Vec<RegionId>>,
301 ) -> Result<FileRefsManifest> {
302 let file_ref_mgr = self.file_ref_manager();
303
304 let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
305 let query_regions: Vec<MitoRegionRef> = file_handle_regions
308 .into_iter()
309 .filter_map(|region_id| self.find_region(region_id))
310 .collect();
311
312 let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
313 .into_iter()
314 .filter_map(|(related_region, queries)| {
315 self.find_region(related_region).map(|r| (r, queries))
316 })
317 .collect();
318
319 file_ref_mgr
320 .get_snapshot_of_file_refs(query_regions, related_regions)
321 .await
322 }
323
324 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
326 self.inner.workers.is_region_exists(region_id)
327 }
328
329 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
331 self.inner.workers.is_region_opening(region_id)
332 }
333
334 pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
336 self.inner.workers.is_region_catching_up(region_id)
337 }
338
339 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
341 self.find_region(region_id)
342 .map(|region| region.region_statistic())
343 }
344
345 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
347 self.find_region(region_id)
348 .map(|r| r.primary_key_encoding())
349 }
350
351 #[tracing::instrument(skip_all)]
356 pub async fn scan_to_stream(
357 &self,
358 region_id: RegionId,
359 request: ScanRequest,
360 ) -> Result<SendableRecordBatchStream, BoxedError> {
361 self.scanner(region_id, request)
362 .await
363 .map_err(BoxedError::new)?
364 .scan()
365 .await
366 }
367
368 pub async fn scan_batch(
370 &self,
371 region_id: RegionId,
372 request: ScanRequest,
373 filter_deleted: bool,
374 ) -> Result<ScanBatchStream> {
375 let mut scan_region = self.scan_region(region_id, request)?;
376 scan_region.set_filter_deleted(filter_deleted);
377 scan_region.scanner().await?.scan_batch()
378 }
379
380 pub(crate) async fn scanner(
382 &self,
383 region_id: RegionId,
384 request: ScanRequest,
385 ) -> Result<Scanner> {
386 self.scan_region(region_id, request)?.scanner().await
387 }
388
389 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
391 self.inner.scan_region(region_id, request)
392 }
393
394 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
399 let _timer = HANDLE_REQUEST_ELAPSED
400 .with_label_values(&["edit_region"])
401 .start_timer();
402
403 ensure!(
404 is_valid_region_edit(&edit),
405 InvalidRequestSnafu {
406 region_id,
407 reason: "invalid region edit"
408 }
409 );
410
411 let (tx, rx) = oneshot::channel();
412 let request = WorkerRequest::EditRegion(RegionEditRequest {
413 region_id,
414 edit,
415 tx,
416 });
417 self.inner
418 .workers
419 .submit_to_worker(region_id, request)
420 .await?;
421 rx.await.context(RecvSnafu)?
422 }
423
424 #[cfg(test)]
425 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
426 self.find_region(id)
427 }
428
429 pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
430 self.inner.workers.get_region(region_id)
431 }
432
433 fn encode_manifest_info_to_extensions(
434 region_id: &RegionId,
435 manifest_info: RegionManifestInfo,
436 extensions: &mut HashMap<String, Vec<u8>>,
437 ) -> Result<()> {
438 let region_manifest_info = vec![(*region_id, manifest_info)];
439
440 extensions.insert(
441 MANIFEST_INFO_EXTENSION_KEY.to_string(),
442 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
443 );
444 info!(
445 "Added manifest info: {:?} to extensions, region_id: {:?}",
446 region_manifest_info, region_id
447 );
448 Ok(())
449 }
450
451 fn encode_column_metadatas_to_extensions(
452 region_id: &RegionId,
453 column_metadatas: Vec<ColumnMetadata>,
454 extensions: &mut HashMap<String, Vec<u8>>,
455 ) -> Result<()> {
456 extensions.insert(
457 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
458 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
459 );
460 info!(
461 "Added column metadatas: {:?} to extensions, region_id: {:?}",
462 column_metadatas, region_id
463 );
464 Ok(())
465 }
466
467 pub fn find_memtable_and_sst_stats(
470 &self,
471 region_id: RegionId,
472 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
473 let region = self
474 .find_region(region_id)
475 .context(RegionNotFoundSnafu { region_id })?;
476
477 let version = region.version();
478 let memtable_stats = version
479 .memtables
480 .list_memtables()
481 .iter()
482 .map(|x| x.stats())
483 .collect::<Vec<_>>();
484
485 let sst_stats = version
486 .ssts
487 .levels()
488 .iter()
489 .flat_map(|level| level.files().map(|x| x.meta_ref()))
490 .cloned()
491 .collect::<Vec<_>>();
492 Ok((memtable_stats, sst_stats))
493 }
494
495 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
497 let node_id = self.inner.workers.file_ref_manager().node_id();
498 let regions = self.inner.workers.all_regions();
499
500 let mut results = Vec::new();
501 for region in regions {
502 let mut entries = region.manifest_sst_entries().await;
503 for e in &mut entries {
504 e.node_id = node_id;
505 }
506 results.extend(entries);
507 }
508
509 results
510 }
511
512 pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
514 let node_id = self.inner.workers.file_ref_manager().node_id();
515 let cache_manager = self.inner.workers.cache_manager();
516 let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
517 let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
518 let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
519
520 let mut results = Vec::new();
521
522 for region in self.inner.workers.all_regions() {
523 let manifest_entries = region.manifest_sst_entries().await;
524 let access_layer = region.access_layer.clone();
525 let table_dir = access_layer.table_dir().to_string();
526 let path_type = access_layer.path_type();
527 let object_store = access_layer.object_store().clone();
528 let puffin_factory = access_layer.puffin_manager_factory().clone();
529 let path_factory = RegionFilePathFactory::new(table_dir, path_type);
530
531 let entry_futures = manifest_entries.into_iter().map(|entry| {
532 let object_store = object_store.clone();
533 let path_factory = path_factory.clone();
534 let puffin_factory = puffin_factory.clone();
535 let puffin_metadata_cache = puffin_metadata_cache.clone();
536 let bloom_filter_cache = bloom_filter_cache.clone();
537 let inverted_index_cache = inverted_index_cache.clone();
538
539 async move {
540 let Some(index_file_path) = entry.index_file_path.as_ref() else {
541 return Vec::new();
542 };
543
544 let index_version = entry.index_version;
545 let file_id = match FileId::parse_str(&entry.file_id) {
546 Ok(file_id) => file_id,
547 Err(err) => {
548 warn!(
549 err;
550 "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
551 entry.table_dir,
552 entry.file_id
553 );
554 return Vec::new();
555 }
556 };
557 let region_index_id = RegionIndexId::new(
558 RegionFileId::new(entry.region_id, file_id),
559 index_version,
560 );
561 let context = IndexEntryContext {
562 table_dir: &entry.table_dir,
563 index_file_path: index_file_path.as_str(),
564 region_id: entry.region_id,
565 table_id: entry.table_id,
566 region_number: entry.region_number,
567 region_group: entry.region_group,
568 region_sequence: entry.region_sequence,
569 file_id: &entry.file_id,
570 index_file_size: entry.index_file_size,
571 node_id,
572 };
573
574 let manager = puffin_factory
575 .build(object_store, path_factory)
576 .with_puffin_metadata_cache(puffin_metadata_cache);
577
578 collect_index_entries_from_puffin(
579 manager,
580 region_index_id,
581 context,
582 bloom_filter_cache,
583 inverted_index_cache,
584 )
585 .await
586 }
587 });
588
589 let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); while let Some(mut metas) = meta_stream.next().await {
591 results.append(&mut metas);
592 }
593 }
594
595 results
596 }
597
598 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
600 let node_id = self.inner.workers.file_ref_manager().node_id();
601 let regions = self.inner.workers.all_regions();
602
603 let mut layers_distinct_table_dirs = HashMap::new();
604 for region in regions {
605 let table_dir = region.access_layer.table_dir();
606 if !layers_distinct_table_dirs.contains_key(table_dir) {
607 layers_distinct_table_dirs
608 .insert(table_dir.to_string(), region.access_layer.clone());
609 }
610 }
611
612 stream::iter(layers_distinct_table_dirs)
613 .map(|(_, access_layer)| access_layer.storage_sst_entries())
614 .flatten()
615 .map(move |entry| {
616 entry.map(move |mut entry| {
617 entry.node_id = node_id;
618 entry
619 })
620 })
621 }
622}
623
624fn is_valid_region_edit(edit: &RegionEdit) -> bool {
626 !edit.files_to_add.is_empty()
627 && edit.files_to_remove.is_empty()
628 && matches!(
629 edit,
630 RegionEdit {
631 files_to_add: _,
632 files_to_remove: _,
633 timestamp_ms: _,
634 compaction_time_window: None,
635 flushed_entry_id: None,
636 flushed_sequence: None,
637 ..
638 }
639 )
640}
641
642struct EngineInner {
644 workers: WorkerGroup,
646 config: Arc<MitoConfig>,
648 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
650 scan_memory_tracker: QueryMemoryTracker,
652 #[cfg(feature = "enterprise")]
653 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
654}
655
656type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
657
658fn prepare_batch_open_requests(
660 requests: Vec<(RegionId, RegionOpenRequest)>,
661) -> Result<(
662 TopicGroupedRegionOpenRequests,
663 Vec<(RegionId, RegionOpenRequest)>,
664)> {
665 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
666 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
667 for (region_id, request) in requests {
668 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
669 serde_json::from_str(options).context(SerdeJsonSnafu)?
670 } else {
671 WalOptions::RaftEngine
672 };
673 match options {
674 WalOptions::Kafka(options) => {
675 topic_to_regions
676 .entry(options.topic)
677 .or_default()
678 .push((region_id, request));
679 }
680 WalOptions::RaftEngine | WalOptions::Noop => {
681 remaining_regions.push((region_id, request));
682 }
683 }
684 }
685
686 Ok((topic_to_regions, remaining_regions))
687}
688
689impl EngineInner {
690 #[cfg(feature = "enterprise")]
691 #[must_use]
692 fn with_extension_range_provider_factory(
693 self,
694 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
695 ) -> Self {
696 Self {
697 extension_range_provider_factory,
698 ..self
699 }
700 }
701
702 async fn stop(&self) -> Result<()> {
704 self.workers.stop().await
705 }
706
707 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
708 self.workers
709 .get_region(region_id)
710 .context(RegionNotFoundSnafu { region_id })
711 }
712
713 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
717 let region = self.find_region(region_id)?;
719 Ok(region.metadata())
720 }
721
722 async fn open_topic_regions(
723 &self,
724 topic: String,
725 region_requests: Vec<(RegionId, RegionOpenRequest)>,
726 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
727 let now = Instant::now();
728 let region_ids = region_requests
729 .iter()
730 .map(|(region_id, _)| *region_id)
731 .collect::<Vec<_>>();
732 let provider = Provider::kafka_provider(topic);
733 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
734 provider.clone(),
735 self.wal_raw_entry_reader.clone(),
736 ®ion_ids,
737 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
738 );
739
740 let mut responses = Vec::with_capacity(region_requests.len());
741 for ((region_id, request), entry_receiver) in
742 region_requests.into_iter().zip(entry_receivers)
743 {
744 let (request, receiver) =
745 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
746 self.workers.submit_to_worker(region_id, request).await?;
747 responses.push(async move { receiver.await.context(RecvSnafu)? });
748 }
749
750 let distribution =
752 common_runtime::spawn_global(async move { distributor.distribute().await });
753 let responses = join_all(responses).await;
755 distribution.await.context(JoinSnafu)??;
756
757 let num_failure = responses.iter().filter(|r| r.is_err()).count();
758 info!(
759 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
760 region_ids.len() - num_failure,
761 provider.as_kafka_provider().unwrap(),
763 num_failure,
764 now.elapsed(),
765 );
766 Ok(region_ids.into_iter().zip(responses).collect())
767 }
768
769 async fn handle_batch_open_requests(
770 &self,
771 parallelism: usize,
772 requests: Vec<(RegionId, RegionOpenRequest)>,
773 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
774 let semaphore = Arc::new(Semaphore::new(parallelism));
775 let (topic_to_region_requests, remaining_region_requests) =
776 prepare_batch_open_requests(requests)?;
777 let mut responses =
778 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
779
780 if !topic_to_region_requests.is_empty() {
781 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
782 for (topic, region_requests) in topic_to_region_requests {
783 let semaphore_moved = semaphore.clone();
784 tasks.push(async move {
785 let _permit = semaphore_moved.acquire().await.unwrap();
787 self.open_topic_regions(topic, region_requests).await
788 })
789 }
790 let r = try_join_all(tasks).await?;
791 responses.extend(r.into_iter().flatten());
792 }
793
794 if !remaining_region_requests.is_empty() {
795 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
796 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
797 for (region_id, request) in remaining_region_requests {
798 let semaphore_moved = semaphore.clone();
799 region_ids.push(region_id);
800 tasks.push(async move {
801 let _permit = semaphore_moved.acquire().await.unwrap();
803 let (request, receiver) =
804 WorkerRequest::new_open_region_request(region_id, request, None);
805
806 self.workers.submit_to_worker(region_id, request).await?;
807
808 receiver.await.context(RecvSnafu)?
809 })
810 }
811
812 let results = join_all(tasks).await;
813 responses.extend(region_ids.into_iter().zip(results));
814 }
815
816 Ok(responses)
817 }
818
819 async fn catchup_topic_regions(
820 &self,
821 provider: Provider,
822 region_requests: Vec<(RegionId, RegionCatchupRequest)>,
823 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
824 let now = Instant::now();
825 let region_ids = region_requests
826 .iter()
827 .map(|(region_id, _)| *region_id)
828 .collect::<Vec<_>>();
829 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
830 provider.clone(),
831 self.wal_raw_entry_reader.clone(),
832 ®ion_ids,
833 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
834 );
835
836 let mut responses = Vec::with_capacity(region_requests.len());
837 for ((region_id, request), entry_receiver) in
838 region_requests.into_iter().zip(entry_receivers)
839 {
840 let (request, receiver) =
841 WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
842 self.workers.submit_to_worker(region_id, request).await?;
843 responses.push(async move { receiver.await.context(RecvSnafu)? });
844 }
845
846 let distribution =
848 common_runtime::spawn_global(async move { distributor.distribute().await });
849 let responses = join_all(responses).await;
851 distribution.await.context(JoinSnafu)??;
852
853 let num_failure = responses.iter().filter(|r| r.is_err()).count();
854 info!(
855 "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
856 region_ids.len() - num_failure,
857 provider.as_kafka_provider().unwrap(),
859 num_failure,
860 now.elapsed(),
861 );
862
863 Ok(region_ids.into_iter().zip(responses).collect())
864 }
865
866 async fn handle_batch_catchup_requests(
867 &self,
868 parallelism: usize,
869 requests: Vec<(RegionId, RegionCatchupRequest)>,
870 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
871 let mut responses = Vec::with_capacity(requests.len());
872 let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
873 let mut remaining_region_requests = vec![];
874
875 for (region_id, request) in requests {
876 match self.workers.get_region(region_id) {
877 Some(region) => match region.provider.as_kafka_provider() {
878 Some(provider) => {
879 topic_regions
880 .entry(provider.clone())
881 .or_default()
882 .push((region_id, request));
883 }
884 None => {
885 remaining_region_requests.push((region_id, request));
886 }
887 },
888 None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
889 }
890 }
891
892 let semaphore = Arc::new(Semaphore::new(parallelism));
893
894 if !topic_regions.is_empty() {
895 let mut tasks = Vec::with_capacity(topic_regions.len());
896 for (provider, region_requests) in topic_regions {
897 let semaphore_moved = semaphore.clone();
898 tasks.push(async move {
899 let _permit = semaphore_moved.acquire().await.unwrap();
901 self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
902 .await
903 })
904 }
905
906 let r = try_join_all(tasks).await?;
907 responses.extend(r.into_iter().flatten());
908 }
909
910 if !remaining_region_requests.is_empty() {
911 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
912 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
913 for (region_id, request) in remaining_region_requests {
914 let semaphore_moved = semaphore.clone();
915 region_ids.push(region_id);
916 tasks.push(async move {
917 let _permit = semaphore_moved.acquire().await.unwrap();
919 let (request, receiver) =
920 WorkerRequest::new_catchup_region_request(region_id, request, None);
921
922 self.workers.submit_to_worker(region_id, request).await?;
923
924 receiver.await.context(RecvSnafu)?
925 })
926 }
927
928 let results = join_all(tasks).await;
929 responses.extend(region_ids.into_iter().zip(results));
930 }
931
932 Ok(responses)
933 }
934
935 async fn handle_request(
937 &self,
938 region_id: RegionId,
939 request: RegionRequest,
940 ) -> Result<AffectedRows> {
941 let region_metadata = self.get_metadata(region_id).ok();
942 let (request, receiver) =
943 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
944 self.workers.submit_to_worker(region_id, request).await?;
945
946 receiver.await.context(RecvSnafu)?
947 }
948
949 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
951 self.find_region(region_id)
953 .map(|r| r.find_committed_sequence())
954 }
955
956 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
958 let query_start = Instant::now();
959 let region = self.find_region(region_id)?;
961 let version = region.version();
962 let cache_manager = self.workers.cache_manager();
964
965 let scan_region = ScanRegion::new(
966 version,
967 region.access_layer.clone(),
968 request,
969 CacheStrategy::EnableAll(cache_manager),
970 )
971 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
972 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
973 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
974 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
975 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
976 .with_start_time(query_start);
977
978 #[cfg(feature = "enterprise")]
979 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
980
981 Ok(scan_region)
982 }
983
984 #[cfg(feature = "enterprise")]
985 fn maybe_fill_extension_range_provider(
986 &self,
987 mut scan_region: ScanRegion,
988 region: MitoRegionRef,
989 ) -> ScanRegion {
990 if region.is_follower()
991 && let Some(factory) = self.extension_range_provider_factory.as_ref()
992 {
993 scan_region
994 .set_extension_range_provider(factory.create_extension_range_provider(region));
995 }
996 scan_region
997 }
998
999 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
1001 let region = self.find_region(region_id)?;
1002 region.set_role(role);
1003 Ok(())
1004 }
1005
1006 async fn set_region_role_state_gracefully(
1008 &self,
1009 region_id: RegionId,
1010 region_role_state: SettableRegionRoleState,
1011 ) -> Result<SetRegionRoleStateResponse> {
1012 let (request, receiver) =
1015 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1016 self.workers.submit_to_worker(region_id, request).await?;
1017
1018 receiver.await.context(RecvSnafu)
1019 }
1020
1021 async fn sync_region(
1022 &self,
1023 region_id: RegionId,
1024 manifest_info: RegionManifestInfo,
1025 ) -> Result<(ManifestVersion, bool)> {
1026 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1027 let manifest_version = manifest_info.data_manifest_version();
1028 let (request, receiver) =
1029 WorkerRequest::new_sync_region_request(region_id, manifest_version);
1030 self.workers.submit_to_worker(region_id, request).await?;
1031
1032 receiver.await.context(RecvSnafu)?
1033 }
1034
1035 async fn remap_manifests(
1036 &self,
1037 request: RemapManifestsRequest,
1038 ) -> Result<RemapManifestsResponse> {
1039 let region_id = request.region_id;
1040 let (request, receiver) = WorkerRequest::try_from_remap_manifests_request(request)?;
1041 self.workers.submit_to_worker(region_id, request).await?;
1042 let manifests = receiver.await.context(RecvSnafu)??;
1043
1044 let new_manifests = manifests
1045 .into_iter()
1046 .map(|(region_id, manifest)| {
1047 Ok((
1048 region_id,
1049 serde_json::to_string(&manifest)
1050 .context(SerializeManifestSnafu { region_id })?,
1051 ))
1052 })
1053 .collect::<Result<HashMap<_, _>>>()?;
1054 Ok(RemapManifestsResponse { new_manifests })
1055 }
1056
1057 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1058 self.workers.get_region(region_id).map(|region| {
1059 if region.is_follower() {
1060 RegionRole::Follower
1061 } else {
1062 RegionRole::Leader
1063 }
1064 })
1065 }
1066}
1067
1068#[async_trait]
1069impl RegionEngine for MitoEngine {
1070 fn name(&self) -> &str {
1071 MITO_ENGINE_NAME
1072 }
1073
1074 #[tracing::instrument(skip_all)]
1075 async fn handle_batch_open_requests(
1076 &self,
1077 parallelism: usize,
1078 requests: Vec<(RegionId, RegionOpenRequest)>,
1079 ) -> Result<BatchResponses, BoxedError> {
1080 self.inner
1082 .handle_batch_open_requests(parallelism, requests)
1083 .await
1084 .map(|responses| {
1085 responses
1086 .into_iter()
1087 .map(|(region_id, response)| {
1088 (
1089 region_id,
1090 response.map(RegionResponse::new).map_err(BoxedError::new),
1091 )
1092 })
1093 .collect::<Vec<_>>()
1094 })
1095 .map_err(BoxedError::new)
1096 }
1097
1098 #[tracing::instrument(skip_all)]
1099 async fn handle_batch_catchup_requests(
1100 &self,
1101 parallelism: usize,
1102 requests: Vec<(RegionId, RegionCatchupRequest)>,
1103 ) -> Result<BatchResponses, BoxedError> {
1104 self.inner
1105 .handle_batch_catchup_requests(parallelism, requests)
1106 .await
1107 .map(|responses| {
1108 responses
1109 .into_iter()
1110 .map(|(region_id, response)| {
1111 (
1112 region_id,
1113 response.map(RegionResponse::new).map_err(BoxedError::new),
1114 )
1115 })
1116 .collect::<Vec<_>>()
1117 })
1118 .map_err(BoxedError::new)
1119 }
1120
1121 #[tracing::instrument(skip_all)]
1122 async fn handle_request(
1123 &self,
1124 region_id: RegionId,
1125 request: RegionRequest,
1126 ) -> Result<RegionResponse, BoxedError> {
1127 let _timer = HANDLE_REQUEST_ELAPSED
1128 .with_label_values(&[request.request_type()])
1129 .start_timer();
1130
1131 let is_alter = matches!(request, RegionRequest::Alter(_));
1132 let is_create = matches!(request, RegionRequest::Create(_));
1133 let mut response = self
1134 .inner
1135 .handle_request(region_id, request)
1136 .await
1137 .map(RegionResponse::new)
1138 .map_err(BoxedError::new)?;
1139
1140 if is_alter {
1141 self.handle_alter_response(region_id, &mut response)
1142 .map_err(BoxedError::new)?;
1143 } else if is_create {
1144 self.handle_create_response(region_id, &mut response)
1145 .map_err(BoxedError::new)?;
1146 }
1147
1148 Ok(response)
1149 }
1150
1151 #[tracing::instrument(skip_all)]
1152 async fn handle_query(
1153 &self,
1154 region_id: RegionId,
1155 request: ScanRequest,
1156 ) -> Result<RegionScannerRef, BoxedError> {
1157 self.scan_region(region_id, request)
1158 .map_err(BoxedError::new)?
1159 .region_scanner()
1160 .await
1161 .map_err(BoxedError::new)
1162 }
1163
1164 fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
1165 Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
1166 }
1167
1168 async fn get_committed_sequence(
1169 &self,
1170 region_id: RegionId,
1171 ) -> Result<SequenceNumber, BoxedError> {
1172 self.inner
1173 .get_committed_sequence(region_id)
1174 .map_err(BoxedError::new)
1175 }
1176
1177 async fn get_metadata(
1179 &self,
1180 region_id: RegionId,
1181 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1182 self.inner.get_metadata(region_id).map_err(BoxedError::new)
1183 }
1184
1185 async fn stop(&self) -> std::result::Result<(), BoxedError> {
1191 self.inner.stop().await.map_err(BoxedError::new)
1192 }
1193
1194 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1195 self.get_region_statistic(region_id)
1196 }
1197
1198 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1199 self.inner
1200 .set_region_role(region_id, role)
1201 .map_err(BoxedError::new)
1202 }
1203
1204 async fn set_region_role_state_gracefully(
1205 &self,
1206 region_id: RegionId,
1207 region_role_state: SettableRegionRoleState,
1208 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1209 let _timer = HANDLE_REQUEST_ELAPSED
1210 .with_label_values(&["set_region_role_state_gracefully"])
1211 .start_timer();
1212
1213 self.inner
1214 .set_region_role_state_gracefully(region_id, region_role_state)
1215 .await
1216 .map_err(BoxedError::new)
1217 }
1218
1219 async fn sync_region(
1220 &self,
1221 region_id: RegionId,
1222 manifest_info: RegionManifestInfo,
1223 ) -> Result<SyncManifestResponse, BoxedError> {
1224 let (_, synced) = self
1225 .inner
1226 .sync_region(region_id, manifest_info)
1227 .await
1228 .map_err(BoxedError::new)?;
1229
1230 Ok(SyncManifestResponse::Mito { synced })
1231 }
1232
1233 async fn remap_manifests(
1234 &self,
1235 request: RemapManifestsRequest,
1236 ) -> Result<RemapManifestsResponse, BoxedError> {
1237 self.inner
1238 .remap_manifests(request)
1239 .await
1240 .map_err(BoxedError::new)
1241 }
1242
1243 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1244 self.inner.role(region_id)
1245 }
1246
1247 fn as_any(&self) -> &dyn Any {
1248 self
1249 }
1250}
1251
1252impl MitoEngine {
1253 fn handle_alter_response(
1254 &self,
1255 region_id: RegionId,
1256 response: &mut RegionResponse,
1257 ) -> Result<()> {
1258 if let Some(statistic) = self.region_statistic(region_id) {
1259 Self::encode_manifest_info_to_extensions(
1260 ®ion_id,
1261 statistic.manifest,
1262 &mut response.extensions,
1263 )?;
1264 }
1265 let column_metadatas = self
1266 .inner
1267 .find_region(region_id)
1268 .ok()
1269 .map(|r| r.metadata().column_metadatas.clone());
1270 if let Some(column_metadatas) = column_metadatas {
1271 Self::encode_column_metadatas_to_extensions(
1272 ®ion_id,
1273 column_metadatas,
1274 &mut response.extensions,
1275 )?;
1276 }
1277 Ok(())
1278 }
1279
1280 fn handle_create_response(
1281 &self,
1282 region_id: RegionId,
1283 response: &mut RegionResponse,
1284 ) -> Result<()> {
1285 let column_metadatas = self
1286 .inner
1287 .find_region(region_id)
1288 .ok()
1289 .map(|r| r.metadata().column_metadatas.clone());
1290 if let Some(column_metadatas) = column_metadatas {
1291 Self::encode_column_metadatas_to_extensions(
1292 ®ion_id,
1293 column_metadatas,
1294 &mut response.extensions,
1295 )?;
1296 }
1297 Ok(())
1298 }
1299}
1300
1301#[cfg(any(test, feature = "test"))]
1303#[allow(clippy::too_many_arguments)]
1304impl MitoEngine {
1305 pub async fn new_for_test<S: LogStore>(
1307 data_home: &str,
1308 mut config: MitoConfig,
1309 log_store: Arc<S>,
1310 object_store_manager: ObjectStoreManagerRef,
1311 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1312 listener: Option<crate::engine::listener::EventListenerRef>,
1313 time_provider: crate::time_provider::TimeProviderRef,
1314 schema_metadata_manager: SchemaMetadataManagerRef,
1315 file_ref_manager: FileReferenceManagerRef,
1316 partition_expr_fetcher: PartitionExprFetcherRef,
1317 ) -> Result<MitoEngine> {
1318 config.sanitize(data_home)?;
1319
1320 let config = Arc::new(config);
1321 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1322 let total_memory = get_total_memory_bytes().max(0) as u64;
1323 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1324 let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
1325 .with_on_update(|usage| {
1326 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1327 })
1328 .with_on_reject(|| {
1329 SCAN_REQUESTS_REJECTED_TOTAL.inc();
1330 });
1331 Ok(MitoEngine {
1332 inner: Arc::new(EngineInner {
1333 workers: WorkerGroup::start_for_test(
1334 config.clone(),
1335 log_store,
1336 object_store_manager,
1337 write_buffer_manager,
1338 listener,
1339 schema_metadata_manager,
1340 file_ref_manager,
1341 time_provider,
1342 partition_expr_fetcher,
1343 )
1344 .await?,
1345 config,
1346 wal_raw_entry_reader,
1347 scan_memory_tracker,
1348 #[cfg(feature = "enterprise")]
1349 extension_range_provider_factory: None,
1350 }),
1351 })
1352 }
1353
1354 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1356 self.inner.workers.purge_scheduler()
1357 }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use std::time::Duration;
1363
1364 use super::*;
1365 use crate::sst::file::FileMeta;
1366
1367 #[test]
1368 fn test_is_valid_region_edit() {
1369 let edit = RegionEdit {
1371 files_to_add: vec![FileMeta::default()],
1372 files_to_remove: vec![],
1373 timestamp_ms: None,
1374 compaction_time_window: None,
1375 flushed_entry_id: None,
1376 flushed_sequence: None,
1377 committed_sequence: None,
1378 };
1379 assert!(is_valid_region_edit(&edit));
1380
1381 let edit = RegionEdit {
1383 files_to_add: vec![],
1384 files_to_remove: vec![],
1385 timestamp_ms: None,
1386 compaction_time_window: None,
1387 flushed_entry_id: None,
1388 flushed_sequence: None,
1389 committed_sequence: None,
1390 };
1391 assert!(!is_valid_region_edit(&edit));
1392
1393 let edit = RegionEdit {
1395 files_to_add: vec![FileMeta::default()],
1396 files_to_remove: vec![FileMeta::default()],
1397 timestamp_ms: None,
1398 compaction_time_window: None,
1399 flushed_entry_id: None,
1400 flushed_sequence: None,
1401 committed_sequence: None,
1402 };
1403 assert!(!is_valid_region_edit(&edit));
1404
1405 let edit = RegionEdit {
1407 files_to_add: vec![FileMeta::default()],
1408 files_to_remove: vec![],
1409 timestamp_ms: None,
1410 compaction_time_window: Some(Duration::from_secs(1)),
1411 flushed_entry_id: None,
1412 flushed_sequence: None,
1413 committed_sequence: None,
1414 };
1415 assert!(!is_valid_region_edit(&edit));
1416 let edit = RegionEdit {
1417 files_to_add: vec![FileMeta::default()],
1418 files_to_remove: vec![],
1419 timestamp_ms: None,
1420 compaction_time_window: None,
1421 flushed_entry_id: Some(1),
1422 flushed_sequence: None,
1423 committed_sequence: None,
1424 };
1425 assert!(!is_valid_region_edit(&edit));
1426 let edit = RegionEdit {
1427 files_to_add: vec![FileMeta::default()],
1428 files_to_remove: vec![],
1429 timestamp_ms: None,
1430 compaction_time_window: None,
1431 flushed_entry_id: None,
1432 flushed_sequence: Some(1),
1433 committed_sequence: None,
1434 };
1435 assert!(!is_valid_region_edit(&edit));
1436 }
1437}