1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_catchup_test;
25#[cfg(test)]
26mod batch_open_test;
27#[cfg(test)]
28mod bump_committed_sequence_test;
29#[cfg(test)]
30mod catchup_test;
31#[cfg(test)]
32mod close_test;
33#[cfg(test)]
34pub(crate) mod compaction_test;
35#[cfg(test)]
36mod create_test;
37#[cfg(test)]
38mod drop_test;
39#[cfg(test)]
40mod edit_region_test;
41#[cfg(test)]
42mod filter_deleted_test;
43#[cfg(test)]
44mod flush_test;
45#[cfg(test)]
46mod index_build_test;
47#[cfg(any(test, feature = "test"))]
48pub mod listener;
49#[cfg(test)]
50mod merge_mode_test;
51#[cfg(test)]
52mod open_test;
53#[cfg(test)]
54mod parallel_test;
55#[cfg(test)]
56mod projection_test;
57#[cfg(test)]
58mod prune_test;
59#[cfg(test)]
60mod row_selector_test;
61#[cfg(test)]
62mod scan_corrupt;
63#[cfg(test)]
64mod scan_test;
65#[cfg(test)]
66mod set_role_state_test;
67#[cfg(test)]
68mod staging_test;
69#[cfg(test)]
70mod sync_test;
71#[cfg(test)]
72mod truncate_test;
73
74mod puffin_index;
75
76use std::any::Any;
77use std::collections::HashMap;
78use std::sync::Arc;
79use std::time::Instant;
80
81use api::region::RegionResponse;
82use async_trait::async_trait;
83use common_base::Plugins;
84use common_error::ext::BoxedError;
85use common_meta::key::SchemaMetadataManagerRef;
86use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
87use common_stat::get_total_memory_bytes;
88use common_telemetry::{info, tracing, warn};
89use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
90use futures::future::{join_all, try_join_all};
91use futures::stream::{self, Stream, StreamExt};
92use object_store::manager::ObjectStoreManagerRef;
93use snafu::{OptionExt, ResultExt, ensure};
94use store_api::ManifestVersion;
95use store_api::codec::PrimaryKeyEncoding;
96use store_api::logstore::LogStore;
97use store_api::logstore::provider::{KafkaProvider, Provider};
98use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
99use store_api::metric_engine_consts::{
100 MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
101};
102use store_api::region_engine::{
103 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
104 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
105};
106use store_api::region_request::{
107 AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
108};
109use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry};
110use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber};
111use tokio::sync::{Semaphore, oneshot};
112
113use crate::access_layer::RegionFilePathFactory;
114use crate::cache::{CacheManagerRef, CacheStrategy};
115use crate::config::MitoConfig;
116use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
117use crate::error::{
118 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
119 SerdeJsonSnafu, SerializeColumnMetadataSnafu,
120};
121#[cfg(feature = "enterprise")]
122use crate::extension::BoxedExtensionRangeProviderFactory;
123use crate::gc::GcLimiterRef;
124use crate::manifest::action::RegionEdit;
125use crate::memtable::MemtableStats;
126use crate::metrics::{
127 HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
128};
129use crate::read::scan_region::{ScanRegion, Scanner};
130use crate::read::stream::ScanBatchStream;
131use crate::region::MitoRegionRef;
132use crate::region::opener::PartitionExprFetcherRef;
133use crate::request::{RegionEditRequest, WorkerRequest};
134use crate::sst::file::{FileMeta, RegionFileId};
135use crate::sst::file_ref::FileReferenceManagerRef;
136use crate::wal::entry_distributor::{
137 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, build_wal_entry_distributor_and_receivers,
138};
139use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
140use crate::worker::WorkerGroup;
141
142pub const MITO_ENGINE_NAME: &str = "mito";
143
144pub struct MitoEngineBuilder<'a, S: LogStore> {
145 data_home: &'a str,
146 config: MitoConfig,
147 log_store: Arc<S>,
148 object_store_manager: ObjectStoreManagerRef,
149 schema_metadata_manager: SchemaMetadataManagerRef,
150 file_ref_manager: FileReferenceManagerRef,
151 partition_expr_fetcher: PartitionExprFetcherRef,
152 plugins: Plugins,
153 max_concurrent_queries: usize,
154 #[cfg(feature = "enterprise")]
155 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
156}
157
158impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
159 #[allow(clippy::too_many_arguments)]
160 pub fn new(
161 data_home: &'a str,
162 config: MitoConfig,
163 log_store: Arc<S>,
164 object_store_manager: ObjectStoreManagerRef,
165 schema_metadata_manager: SchemaMetadataManagerRef,
166 file_ref_manager: FileReferenceManagerRef,
167 partition_expr_fetcher: PartitionExprFetcherRef,
168 plugins: Plugins,
169 max_concurrent_queries: usize,
170 ) -> Self {
171 Self {
172 data_home,
173 config,
174 log_store,
175 object_store_manager,
176 schema_metadata_manager,
177 file_ref_manager,
178 plugins,
179 partition_expr_fetcher,
180 max_concurrent_queries,
181 #[cfg(feature = "enterprise")]
182 extension_range_provider_factory: None,
183 }
184 }
185
186 #[cfg(feature = "enterprise")]
187 #[must_use]
188 pub fn with_extension_range_provider_factory(
189 self,
190 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
191 ) -> Self {
192 Self {
193 extension_range_provider_factory,
194 ..self
195 }
196 }
197
198 pub async fn try_build(mut self) -> Result<MitoEngine> {
199 self.config.sanitize(self.data_home)?;
200
201 let config = Arc::new(self.config);
202 let workers = WorkerGroup::start(
203 config.clone(),
204 self.log_store.clone(),
205 self.object_store_manager,
206 self.schema_metadata_manager,
207 self.file_ref_manager,
208 self.partition_expr_fetcher.clone(),
209 self.plugins,
210 )
211 .await?;
212 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
213 let total_memory = get_total_memory_bytes().max(0) as u64;
214 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
215 let scan_memory_tracker =
216 QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
217 .with_on_update(|usage| {
218 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
219 })
220 .with_on_reject(|| {
221 SCAN_REQUESTS_REJECTED_TOTAL.inc();
222 });
223
224 let inner = EngineInner {
225 workers,
226 config,
227 wal_raw_entry_reader,
228 scan_memory_tracker,
229 #[cfg(feature = "enterprise")]
230 extension_range_provider_factory: None,
231 };
232
233 #[cfg(feature = "enterprise")]
234 let inner =
235 inner.with_extension_range_provider_factory(self.extension_range_provider_factory);
236
237 Ok(MitoEngine {
238 inner: Arc::new(inner),
239 })
240 }
241}
242
243#[derive(Clone)]
245pub struct MitoEngine {
246 inner: Arc<EngineInner>,
247}
248
249impl MitoEngine {
250 #[allow(clippy::too_many_arguments)]
252 pub async fn new<S: LogStore>(
253 data_home: &str,
254 config: MitoConfig,
255 log_store: Arc<S>,
256 object_store_manager: ObjectStoreManagerRef,
257 schema_metadata_manager: SchemaMetadataManagerRef,
258 file_ref_manager: FileReferenceManagerRef,
259 partition_expr_fetcher: PartitionExprFetcherRef,
260 plugins: Plugins,
261 ) -> Result<MitoEngine> {
262 let builder = MitoEngineBuilder::new(
263 data_home,
264 config,
265 log_store,
266 object_store_manager,
267 schema_metadata_manager,
268 file_ref_manager,
269 partition_expr_fetcher,
270 plugins,
271 0, );
273 builder.try_build().await
274 }
275
276 pub fn mito_config(&self) -> &MitoConfig {
277 &self.inner.config
278 }
279
280 pub fn cache_manager(&self) -> CacheManagerRef {
281 self.inner.workers.cache_manager()
282 }
283
284 pub fn file_ref_manager(&self) -> FileReferenceManagerRef {
285 self.inner.workers.file_ref_manager()
286 }
287
288 pub fn gc_limiter(&self) -> GcLimiterRef {
289 self.inner.workers.gc_limiter()
290 }
291
292 pub async fn get_snapshot_of_file_refs(
294 &self,
295 file_handle_regions: impl IntoIterator<Item = RegionId>,
296 manifest_regions: HashMap<RegionId, Vec<RegionId>>,
297 ) -> Result<FileRefsManifest> {
298 let file_ref_mgr = self.file_ref_manager();
299
300 let file_handle_regions = file_handle_regions.into_iter().collect::<Vec<_>>();
301 let query_regions: Vec<MitoRegionRef> = file_handle_regions
304 .into_iter()
305 .filter_map(|region_id| self.find_region(region_id))
306 .collect();
307
308 let related_regions: Vec<(MitoRegionRef, Vec<RegionId>)> = manifest_regions
309 .into_iter()
310 .filter_map(|(related_region, queries)| {
311 self.find_region(related_region).map(|r| (r, queries))
312 })
313 .collect();
314
315 file_ref_mgr
316 .get_snapshot_of_file_refs(query_regions, related_regions)
317 .await
318 }
319
320 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
322 self.inner.workers.is_region_exists(region_id)
323 }
324
325 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
327 self.inner.workers.is_region_opening(region_id)
328 }
329
330 pub fn is_region_catching_up(&self, region_id: RegionId) -> bool {
332 self.inner.workers.is_region_catching_up(region_id)
333 }
334
335 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
337 self.find_region(region_id)
338 .map(|region| region.region_statistic())
339 }
340
341 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
343 self.find_region(region_id)
344 .map(|r| r.primary_key_encoding())
345 }
346
347 #[tracing::instrument(skip_all)]
352 pub async fn scan_to_stream(
353 &self,
354 region_id: RegionId,
355 request: ScanRequest,
356 ) -> Result<SendableRecordBatchStream, BoxedError> {
357 self.scanner(region_id, request)
358 .await
359 .map_err(BoxedError::new)?
360 .scan()
361 .await
362 }
363
364 pub async fn scan_batch(
366 &self,
367 region_id: RegionId,
368 request: ScanRequest,
369 filter_deleted: bool,
370 ) -> Result<ScanBatchStream> {
371 let mut scan_region = self.scan_region(region_id, request)?;
372 scan_region.set_filter_deleted(filter_deleted);
373 scan_region.scanner().await?.scan_batch()
374 }
375
376 pub(crate) async fn scanner(
378 &self,
379 region_id: RegionId,
380 request: ScanRequest,
381 ) -> Result<Scanner> {
382 self.scan_region(region_id, request)?.scanner().await
383 }
384
385 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
387 self.inner.scan_region(region_id, request)
388 }
389
390 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
395 let _timer = HANDLE_REQUEST_ELAPSED
396 .with_label_values(&["edit_region"])
397 .start_timer();
398
399 ensure!(
400 is_valid_region_edit(&edit),
401 InvalidRequestSnafu {
402 region_id,
403 reason: "invalid region edit"
404 }
405 );
406
407 let (tx, rx) = oneshot::channel();
408 let request = WorkerRequest::EditRegion(RegionEditRequest {
409 region_id,
410 edit,
411 tx,
412 });
413 self.inner
414 .workers
415 .submit_to_worker(region_id, request)
416 .await?;
417 rx.await.context(RecvSnafu)?
418 }
419
420 #[cfg(test)]
421 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
422 self.find_region(id)
423 }
424
425 pub fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
426 self.inner.workers.get_region(region_id)
427 }
428
429 fn encode_manifest_info_to_extensions(
430 region_id: &RegionId,
431 manifest_info: RegionManifestInfo,
432 extensions: &mut HashMap<String, Vec<u8>>,
433 ) -> Result<()> {
434 let region_manifest_info = vec![(*region_id, manifest_info)];
435
436 extensions.insert(
437 MANIFEST_INFO_EXTENSION_KEY.to_string(),
438 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
439 );
440 info!(
441 "Added manifest info: {:?} to extensions, region_id: {:?}",
442 region_manifest_info, region_id
443 );
444 Ok(())
445 }
446
447 fn encode_column_metadatas_to_extensions(
448 region_id: &RegionId,
449 column_metadatas: Vec<ColumnMetadata>,
450 extensions: &mut HashMap<String, Vec<u8>>,
451 ) -> Result<()> {
452 extensions.insert(
453 TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
454 ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
455 );
456 info!(
457 "Added column metadatas: {:?} to extensions, region_id: {:?}",
458 column_metadatas, region_id
459 );
460 Ok(())
461 }
462
463 pub fn find_memtable_and_sst_stats(
466 &self,
467 region_id: RegionId,
468 ) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
469 let region = self
470 .find_region(region_id)
471 .context(RegionNotFoundSnafu { region_id })?;
472
473 let version = region.version();
474 let memtable_stats = version
475 .memtables
476 .list_memtables()
477 .iter()
478 .map(|x| x.stats())
479 .collect::<Vec<_>>();
480
481 let sst_stats = version
482 .ssts
483 .levels()
484 .iter()
485 .flat_map(|level| level.files().map(|x| x.meta_ref()))
486 .cloned()
487 .collect::<Vec<_>>();
488 Ok((memtable_stats, sst_stats))
489 }
490
491 pub async fn all_ssts_from_manifest(&self) -> Vec<ManifestSstEntry> {
493 let node_id = self.inner.workers.file_ref_manager().node_id();
494 let regions = self.inner.workers.all_regions();
495
496 let mut results = Vec::new();
497 for region in regions {
498 let mut entries = region.manifest_sst_entries().await;
499 for e in &mut entries {
500 e.node_id = node_id;
501 }
502 results.extend(entries);
503 }
504
505 results
506 }
507
508 pub async fn all_index_metas(&self) -> Vec<PuffinIndexMetaEntry> {
510 let node_id = self.inner.workers.file_ref_manager().node_id();
511 let cache_manager = self.inner.workers.cache_manager();
512 let puffin_metadata_cache = cache_manager.puffin_metadata_cache().cloned();
513 let bloom_filter_cache = cache_manager.bloom_filter_index_cache().cloned();
514 let inverted_index_cache = cache_manager.inverted_index_cache().cloned();
515
516 let mut results = Vec::new();
517
518 for region in self.inner.workers.all_regions() {
519 let manifest_entries = region.manifest_sst_entries().await;
520 let access_layer = region.access_layer.clone();
521 let table_dir = access_layer.table_dir().to_string();
522 let path_type = access_layer.path_type();
523 let object_store = access_layer.object_store().clone();
524 let puffin_factory = access_layer.puffin_manager_factory().clone();
525 let path_factory = RegionFilePathFactory::new(table_dir, path_type);
526
527 let entry_futures = manifest_entries.into_iter().map(|entry| {
528 let object_store = object_store.clone();
529 let path_factory = path_factory.clone();
530 let puffin_factory = puffin_factory.clone();
531 let puffin_metadata_cache = puffin_metadata_cache.clone();
532 let bloom_filter_cache = bloom_filter_cache.clone();
533 let inverted_index_cache = inverted_index_cache.clone();
534
535 async move {
536 let Some(index_file_path) = entry.index_file_path.as_ref() else {
537 return Vec::new();
538 };
539
540 let Some(index_file_id) = entry.index_file_id.as_ref() else {
541 return Vec::new();
542 };
543 let file_id = match FileId::parse_str(index_file_id) {
544 Ok(file_id) => file_id,
545 Err(err) => {
546 warn!(
547 err;
548 "Failed to parse puffin index file id, table_dir: {}, file_id: {}",
549 entry.table_dir,
550 index_file_id
551 );
552 return Vec::new();
553 }
554 };
555 let region_file_id = RegionFileId::new(entry.region_id, file_id);
556 let context = IndexEntryContext {
557 table_dir: &entry.table_dir,
558 index_file_path: index_file_path.as_str(),
559 region_id: entry.region_id,
560 table_id: entry.table_id,
561 region_number: entry.region_number,
562 region_group: entry.region_group,
563 region_sequence: entry.region_sequence,
564 file_id: index_file_id,
565 index_file_size: entry.index_file_size,
566 node_id,
567 };
568
569 let manager = puffin_factory
570 .build(object_store, path_factory)
571 .with_puffin_metadata_cache(puffin_metadata_cache);
572
573 collect_index_entries_from_puffin(
574 manager,
575 region_file_id,
576 context,
577 bloom_filter_cache,
578 inverted_index_cache,
579 )
580 .await
581 }
582 });
583
584 let mut meta_stream = stream::iter(entry_futures).buffer_unordered(8); while let Some(mut metas) = meta_stream.next().await {
586 results.append(&mut metas);
587 }
588 }
589
590 results
591 }
592
593 pub fn all_ssts_from_storage(&self) -> impl Stream<Item = Result<StorageSstEntry>> {
595 let node_id = self.inner.workers.file_ref_manager().node_id();
596 let regions = self.inner.workers.all_regions();
597
598 let mut layers_distinct_table_dirs = HashMap::new();
599 for region in regions {
600 let table_dir = region.access_layer.table_dir();
601 if !layers_distinct_table_dirs.contains_key(table_dir) {
602 layers_distinct_table_dirs
603 .insert(table_dir.to_string(), region.access_layer.clone());
604 }
605 }
606
607 stream::iter(layers_distinct_table_dirs)
608 .map(|(_, access_layer)| access_layer.storage_sst_entries())
609 .flatten()
610 .map(move |entry| {
611 entry.map(move |mut entry| {
612 entry.node_id = node_id;
613 entry
614 })
615 })
616 }
617}
618
619fn is_valid_region_edit(edit: &RegionEdit) -> bool {
621 !edit.files_to_add.is_empty()
622 && edit.files_to_remove.is_empty()
623 && matches!(
624 edit,
625 RegionEdit {
626 files_to_add: _,
627 files_to_remove: _,
628 timestamp_ms: _,
629 compaction_time_window: None,
630 flushed_entry_id: None,
631 flushed_sequence: None,
632 ..
633 }
634 )
635}
636
637struct EngineInner {
639 workers: WorkerGroup,
641 config: Arc<MitoConfig>,
643 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
645 scan_memory_tracker: QueryMemoryTracker,
647 #[cfg(feature = "enterprise")]
648 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
649}
650
651type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
652
653fn prepare_batch_open_requests(
655 requests: Vec<(RegionId, RegionOpenRequest)>,
656) -> Result<(
657 TopicGroupedRegionOpenRequests,
658 Vec<(RegionId, RegionOpenRequest)>,
659)> {
660 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
661 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
662 for (region_id, request) in requests {
663 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
664 serde_json::from_str(options).context(SerdeJsonSnafu)?
665 } else {
666 WalOptions::RaftEngine
667 };
668 match options {
669 WalOptions::Kafka(options) => {
670 topic_to_regions
671 .entry(options.topic)
672 .or_default()
673 .push((region_id, request));
674 }
675 WalOptions::RaftEngine | WalOptions::Noop => {
676 remaining_regions.push((region_id, request));
677 }
678 }
679 }
680
681 Ok((topic_to_regions, remaining_regions))
682}
683
684impl EngineInner {
685 #[cfg(feature = "enterprise")]
686 #[must_use]
687 fn with_extension_range_provider_factory(
688 self,
689 extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
690 ) -> Self {
691 Self {
692 extension_range_provider_factory,
693 ..self
694 }
695 }
696
697 async fn stop(&self) -> Result<()> {
699 self.workers.stop().await
700 }
701
702 fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
703 self.workers
704 .get_region(region_id)
705 .context(RegionNotFoundSnafu { region_id })
706 }
707
708 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
712 let region = self.find_region(region_id)?;
714 Ok(region.metadata())
715 }
716
717 async fn open_topic_regions(
718 &self,
719 topic: String,
720 region_requests: Vec<(RegionId, RegionOpenRequest)>,
721 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
722 let now = Instant::now();
723 let region_ids = region_requests
724 .iter()
725 .map(|(region_id, _)| *region_id)
726 .collect::<Vec<_>>();
727 let provider = Provider::kafka_provider(topic);
728 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
729 provider.clone(),
730 self.wal_raw_entry_reader.clone(),
731 ®ion_ids,
732 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
733 );
734
735 let mut responses = Vec::with_capacity(region_requests.len());
736 for ((region_id, request), entry_receiver) in
737 region_requests.into_iter().zip(entry_receivers)
738 {
739 let (request, receiver) =
740 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
741 self.workers.submit_to_worker(region_id, request).await?;
742 responses.push(async move { receiver.await.context(RecvSnafu)? });
743 }
744
745 let distribution =
747 common_runtime::spawn_global(async move { distributor.distribute().await });
748 let responses = join_all(responses).await;
750 distribution.await.context(JoinSnafu)??;
751
752 let num_failure = responses.iter().filter(|r| r.is_err()).count();
753 info!(
754 "Opened {} regions for topic '{}', failures: {}, elapsed: {:?}",
755 region_ids.len() - num_failure,
756 provider.as_kafka_provider().unwrap(),
758 num_failure,
759 now.elapsed(),
760 );
761 Ok(region_ids.into_iter().zip(responses).collect())
762 }
763
764 async fn handle_batch_open_requests(
765 &self,
766 parallelism: usize,
767 requests: Vec<(RegionId, RegionOpenRequest)>,
768 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
769 let semaphore = Arc::new(Semaphore::new(parallelism));
770 let (topic_to_region_requests, remaining_region_requests) =
771 prepare_batch_open_requests(requests)?;
772 let mut responses =
773 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
774
775 if !topic_to_region_requests.is_empty() {
776 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
777 for (topic, region_requests) in topic_to_region_requests {
778 let semaphore_moved = semaphore.clone();
779 tasks.push(async move {
780 let _permit = semaphore_moved.acquire().await.unwrap();
782 self.open_topic_regions(topic, region_requests).await
783 })
784 }
785 let r = try_join_all(tasks).await?;
786 responses.extend(r.into_iter().flatten());
787 }
788
789 if !remaining_region_requests.is_empty() {
790 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
791 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
792 for (region_id, request) in remaining_region_requests {
793 let semaphore_moved = semaphore.clone();
794 region_ids.push(region_id);
795 tasks.push(async move {
796 let _permit = semaphore_moved.acquire().await.unwrap();
798 let (request, receiver) =
799 WorkerRequest::new_open_region_request(region_id, request, None);
800
801 self.workers.submit_to_worker(region_id, request).await?;
802
803 receiver.await.context(RecvSnafu)?
804 })
805 }
806
807 let results = join_all(tasks).await;
808 responses.extend(region_ids.into_iter().zip(results));
809 }
810
811 Ok(responses)
812 }
813
814 async fn catchup_topic_regions(
815 &self,
816 provider: Provider,
817 region_requests: Vec<(RegionId, RegionCatchupRequest)>,
818 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
819 let now = Instant::now();
820 let region_ids = region_requests
821 .iter()
822 .map(|(region_id, _)| *region_id)
823 .collect::<Vec<_>>();
824 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
825 provider.clone(),
826 self.wal_raw_entry_reader.clone(),
827 ®ion_ids,
828 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
829 );
830
831 let mut responses = Vec::with_capacity(region_requests.len());
832 for ((region_id, request), entry_receiver) in
833 region_requests.into_iter().zip(entry_receivers)
834 {
835 let (request, receiver) =
836 WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver));
837 self.workers.submit_to_worker(region_id, request).await?;
838 responses.push(async move { receiver.await.context(RecvSnafu)? });
839 }
840
841 let distribution =
843 common_runtime::spawn_global(async move { distributor.distribute().await });
844 let responses = join_all(responses).await;
846 distribution.await.context(JoinSnafu)??;
847
848 let num_failure = responses.iter().filter(|r| r.is_err()).count();
849 info!(
850 "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}",
851 region_ids.len() - num_failure,
852 provider.as_kafka_provider().unwrap(),
854 num_failure,
855 now.elapsed(),
856 );
857
858 Ok(region_ids.into_iter().zip(responses).collect())
859 }
860
861 async fn handle_batch_catchup_requests(
862 &self,
863 parallelism: usize,
864 requests: Vec<(RegionId, RegionCatchupRequest)>,
865 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
866 let mut responses = Vec::with_capacity(requests.len());
867 let mut topic_regions: HashMap<Arc<KafkaProvider>, Vec<_>> = HashMap::new();
868 let mut remaining_region_requests = vec![];
869
870 for (region_id, request) in requests {
871 match self.workers.get_region(region_id) {
872 Some(region) => match region.provider.as_kafka_provider() {
873 Some(provider) => {
874 topic_regions
875 .entry(provider.clone())
876 .or_default()
877 .push((region_id, request));
878 }
879 None => {
880 remaining_region_requests.push((region_id, request));
881 }
882 },
883 None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())),
884 }
885 }
886
887 let semaphore = Arc::new(Semaphore::new(parallelism));
888
889 if !topic_regions.is_empty() {
890 let mut tasks = Vec::with_capacity(topic_regions.len());
891 for (provider, region_requests) in topic_regions {
892 let semaphore_moved = semaphore.clone();
893 tasks.push(async move {
894 let _permit = semaphore_moved.acquire().await.unwrap();
896 self.catchup_topic_regions(Provider::Kafka(provider), region_requests)
897 .await
898 })
899 }
900
901 let r = try_join_all(tasks).await?;
902 responses.extend(r.into_iter().flatten());
903 }
904
905 if !remaining_region_requests.is_empty() {
906 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
907 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
908 for (region_id, request) in remaining_region_requests {
909 let semaphore_moved = semaphore.clone();
910 region_ids.push(region_id);
911 tasks.push(async move {
912 let _permit = semaphore_moved.acquire().await.unwrap();
914 let (request, receiver) =
915 WorkerRequest::new_catchup_region_request(region_id, request, None);
916
917 self.workers.submit_to_worker(region_id, request).await?;
918
919 receiver.await.context(RecvSnafu)?
920 })
921 }
922
923 let results = join_all(tasks).await;
924 responses.extend(region_ids.into_iter().zip(results));
925 }
926
927 Ok(responses)
928 }
929
930 async fn handle_request(
932 &self,
933 region_id: RegionId,
934 request: RegionRequest,
935 ) -> Result<AffectedRows> {
936 let region_metadata = self.get_metadata(region_id).ok();
937 let (request, receiver) =
938 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
939 self.workers.submit_to_worker(region_id, request).await?;
940
941 receiver.await.context(RecvSnafu)?
942 }
943
944 fn get_committed_sequence(&self, region_id: RegionId) -> Result<SequenceNumber> {
946 self.find_region(region_id)
948 .map(|r| r.find_committed_sequence())
949 }
950
951 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
953 let query_start = Instant::now();
954 let region = self.find_region(region_id)?;
956 let version = region.version();
957 let cache_manager = self.workers.cache_manager();
959
960 let scan_region = ScanRegion::new(
961 version,
962 region.access_layer.clone(),
963 request,
964 CacheStrategy::EnableAll(cache_manager),
965 )
966 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
967 .with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
968 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
969 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
970 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
971 .with_start_time(query_start);
972
973 #[cfg(feature = "enterprise")]
974 let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
975
976 Ok(scan_region)
977 }
978
979 #[cfg(feature = "enterprise")]
980 fn maybe_fill_extension_range_provider(
981 &self,
982 mut scan_region: ScanRegion,
983 region: MitoRegionRef,
984 ) -> ScanRegion {
985 if region.is_follower()
986 && let Some(factory) = self.extension_range_provider_factory.as_ref()
987 {
988 scan_region
989 .set_extension_range_provider(factory.create_extension_range_provider(region));
990 }
991 scan_region
992 }
993
994 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
996 let region = self.find_region(region_id)?;
997 region.set_role(role);
998 Ok(())
999 }
1000
1001 async fn set_region_role_state_gracefully(
1003 &self,
1004 region_id: RegionId,
1005 region_role_state: SettableRegionRoleState,
1006 ) -> Result<SetRegionRoleStateResponse> {
1007 let (request, receiver) =
1010 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
1011 self.workers.submit_to_worker(region_id, request).await?;
1012
1013 receiver.await.context(RecvSnafu)
1014 }
1015
1016 async fn sync_region(
1017 &self,
1018 region_id: RegionId,
1019 manifest_info: RegionManifestInfo,
1020 ) -> Result<(ManifestVersion, bool)> {
1021 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
1022 let manifest_version = manifest_info.data_manifest_version();
1023 let (request, receiver) =
1024 WorkerRequest::new_sync_region_request(region_id, manifest_version);
1025 self.workers.submit_to_worker(region_id, request).await?;
1026
1027 receiver.await.context(RecvSnafu)?
1028 }
1029
1030 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1031 self.workers.get_region(region_id).map(|region| {
1032 if region.is_follower() {
1033 RegionRole::Follower
1034 } else {
1035 RegionRole::Leader
1036 }
1037 })
1038 }
1039}
1040
1041#[async_trait]
1042impl RegionEngine for MitoEngine {
1043 fn name(&self) -> &str {
1044 MITO_ENGINE_NAME
1045 }
1046
1047 #[tracing::instrument(skip_all)]
1048 async fn handle_batch_open_requests(
1049 &self,
1050 parallelism: usize,
1051 requests: Vec<(RegionId, RegionOpenRequest)>,
1052 ) -> Result<BatchResponses, BoxedError> {
1053 self.inner
1055 .handle_batch_open_requests(parallelism, requests)
1056 .await
1057 .map(|responses| {
1058 responses
1059 .into_iter()
1060 .map(|(region_id, response)| {
1061 (
1062 region_id,
1063 response.map(RegionResponse::new).map_err(BoxedError::new),
1064 )
1065 })
1066 .collect::<Vec<_>>()
1067 })
1068 .map_err(BoxedError::new)
1069 }
1070
1071 #[tracing::instrument(skip_all)]
1072 async fn handle_batch_catchup_requests(
1073 &self,
1074 parallelism: usize,
1075 requests: Vec<(RegionId, RegionCatchupRequest)>,
1076 ) -> Result<BatchResponses, BoxedError> {
1077 self.inner
1078 .handle_batch_catchup_requests(parallelism, requests)
1079 .await
1080 .map(|responses| {
1081 responses
1082 .into_iter()
1083 .map(|(region_id, response)| {
1084 (
1085 region_id,
1086 response.map(RegionResponse::new).map_err(BoxedError::new),
1087 )
1088 })
1089 .collect::<Vec<_>>()
1090 })
1091 .map_err(BoxedError::new)
1092 }
1093
1094 #[tracing::instrument(skip_all)]
1095 async fn handle_request(
1096 &self,
1097 region_id: RegionId,
1098 request: RegionRequest,
1099 ) -> Result<RegionResponse, BoxedError> {
1100 let _timer = HANDLE_REQUEST_ELAPSED
1101 .with_label_values(&[request.request_type()])
1102 .start_timer();
1103
1104 let is_alter = matches!(request, RegionRequest::Alter(_));
1105 let is_create = matches!(request, RegionRequest::Create(_));
1106 let mut response = self
1107 .inner
1108 .handle_request(region_id, request)
1109 .await
1110 .map(RegionResponse::new)
1111 .map_err(BoxedError::new)?;
1112
1113 if is_alter {
1114 self.handle_alter_response(region_id, &mut response)
1115 .map_err(BoxedError::new)?;
1116 } else if is_create {
1117 self.handle_create_response(region_id, &mut response)
1118 .map_err(BoxedError::new)?;
1119 }
1120
1121 Ok(response)
1122 }
1123
1124 #[tracing::instrument(skip_all)]
1125 async fn handle_query(
1126 &self,
1127 region_id: RegionId,
1128 request: ScanRequest,
1129 ) -> Result<RegionScannerRef, BoxedError> {
1130 self.scan_region(region_id, request)
1131 .map_err(BoxedError::new)?
1132 .region_scanner()
1133 .await
1134 .map_err(BoxedError::new)
1135 }
1136
1137 fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
1138 Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
1139 }
1140
1141 async fn get_committed_sequence(
1142 &self,
1143 region_id: RegionId,
1144 ) -> Result<SequenceNumber, BoxedError> {
1145 self.inner
1146 .get_committed_sequence(region_id)
1147 .map_err(BoxedError::new)
1148 }
1149
1150 async fn get_metadata(
1152 &self,
1153 region_id: RegionId,
1154 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
1155 self.inner.get_metadata(region_id).map_err(BoxedError::new)
1156 }
1157
1158 async fn stop(&self) -> std::result::Result<(), BoxedError> {
1164 self.inner.stop().await.map_err(BoxedError::new)
1165 }
1166
1167 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
1168 self.get_region_statistic(region_id)
1169 }
1170
1171 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
1172 self.inner
1173 .set_region_role(region_id, role)
1174 .map_err(BoxedError::new)
1175 }
1176
1177 async fn set_region_role_state_gracefully(
1178 &self,
1179 region_id: RegionId,
1180 region_role_state: SettableRegionRoleState,
1181 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
1182 let _timer = HANDLE_REQUEST_ELAPSED
1183 .with_label_values(&["set_region_role_state_gracefully"])
1184 .start_timer();
1185
1186 self.inner
1187 .set_region_role_state_gracefully(region_id, region_role_state)
1188 .await
1189 .map_err(BoxedError::new)
1190 }
1191
1192 async fn sync_region(
1193 &self,
1194 region_id: RegionId,
1195 manifest_info: RegionManifestInfo,
1196 ) -> Result<SyncManifestResponse, BoxedError> {
1197 let (_, synced) = self
1198 .inner
1199 .sync_region(region_id, manifest_info)
1200 .await
1201 .map_err(BoxedError::new)?;
1202
1203 Ok(SyncManifestResponse::Mito { synced })
1204 }
1205
1206 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
1207 self.inner.role(region_id)
1208 }
1209
1210 fn as_any(&self) -> &dyn Any {
1211 self
1212 }
1213}
1214
1215impl MitoEngine {
1216 fn handle_alter_response(
1217 &self,
1218 region_id: RegionId,
1219 response: &mut RegionResponse,
1220 ) -> Result<()> {
1221 if let Some(statistic) = self.region_statistic(region_id) {
1222 Self::encode_manifest_info_to_extensions(
1223 ®ion_id,
1224 statistic.manifest,
1225 &mut response.extensions,
1226 )?;
1227 }
1228 let column_metadatas = self
1229 .inner
1230 .find_region(region_id)
1231 .ok()
1232 .map(|r| r.metadata().column_metadatas.clone());
1233 if let Some(column_metadatas) = column_metadatas {
1234 Self::encode_column_metadatas_to_extensions(
1235 ®ion_id,
1236 column_metadatas,
1237 &mut response.extensions,
1238 )?;
1239 }
1240 Ok(())
1241 }
1242
1243 fn handle_create_response(
1244 &self,
1245 region_id: RegionId,
1246 response: &mut RegionResponse,
1247 ) -> Result<()> {
1248 let column_metadatas = self
1249 .inner
1250 .find_region(region_id)
1251 .ok()
1252 .map(|r| r.metadata().column_metadatas.clone());
1253 if let Some(column_metadatas) = column_metadatas {
1254 Self::encode_column_metadatas_to_extensions(
1255 ®ion_id,
1256 column_metadatas,
1257 &mut response.extensions,
1258 )?;
1259 }
1260 Ok(())
1261 }
1262}
1263
1264#[cfg(any(test, feature = "test"))]
1266#[allow(clippy::too_many_arguments)]
1267impl MitoEngine {
1268 pub async fn new_for_test<S: LogStore>(
1270 data_home: &str,
1271 mut config: MitoConfig,
1272 log_store: Arc<S>,
1273 object_store_manager: ObjectStoreManagerRef,
1274 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
1275 listener: Option<crate::engine::listener::EventListenerRef>,
1276 time_provider: crate::time_provider::TimeProviderRef,
1277 schema_metadata_manager: SchemaMetadataManagerRef,
1278 file_ref_manager: FileReferenceManagerRef,
1279 partition_expr_fetcher: PartitionExprFetcherRef,
1280 ) -> Result<MitoEngine> {
1281 config.sanitize(data_home)?;
1282
1283 let config = Arc::new(config);
1284 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
1285 let total_memory = get_total_memory_bytes().max(0) as u64;
1286 let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
1287 let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
1288 .with_on_update(|usage| {
1289 SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
1290 })
1291 .with_on_reject(|| {
1292 SCAN_REQUESTS_REJECTED_TOTAL.inc();
1293 });
1294 Ok(MitoEngine {
1295 inner: Arc::new(EngineInner {
1296 workers: WorkerGroup::start_for_test(
1297 config.clone(),
1298 log_store,
1299 object_store_manager,
1300 write_buffer_manager,
1301 listener,
1302 schema_metadata_manager,
1303 file_ref_manager,
1304 time_provider,
1305 partition_expr_fetcher,
1306 )
1307 .await?,
1308 config,
1309 wal_raw_entry_reader,
1310 scan_memory_tracker,
1311 #[cfg(feature = "enterprise")]
1312 extension_range_provider_factory: None,
1313 }),
1314 })
1315 }
1316
1317 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
1319 self.inner.workers.purge_scheduler()
1320 }
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325 use std::time::Duration;
1326
1327 use super::*;
1328 use crate::sst::file::FileMeta;
1329
1330 #[test]
1331 fn test_is_valid_region_edit() {
1332 let edit = RegionEdit {
1334 files_to_add: vec![FileMeta::default()],
1335 files_to_remove: vec![],
1336 timestamp_ms: None,
1337 compaction_time_window: None,
1338 flushed_entry_id: None,
1339 flushed_sequence: None,
1340 committed_sequence: None,
1341 };
1342 assert!(is_valid_region_edit(&edit));
1343
1344 let edit = RegionEdit {
1346 files_to_add: vec![],
1347 files_to_remove: vec![],
1348 timestamp_ms: None,
1349 compaction_time_window: None,
1350 flushed_entry_id: None,
1351 flushed_sequence: None,
1352 committed_sequence: None,
1353 };
1354 assert!(!is_valid_region_edit(&edit));
1355
1356 let edit = RegionEdit {
1358 files_to_add: vec![FileMeta::default()],
1359 files_to_remove: vec![FileMeta::default()],
1360 timestamp_ms: None,
1361 compaction_time_window: None,
1362 flushed_entry_id: None,
1363 flushed_sequence: None,
1364 committed_sequence: None,
1365 };
1366 assert!(!is_valid_region_edit(&edit));
1367
1368 let edit = RegionEdit {
1370 files_to_add: vec![FileMeta::default()],
1371 files_to_remove: vec![],
1372 timestamp_ms: None,
1373 compaction_time_window: Some(Duration::from_secs(1)),
1374 flushed_entry_id: None,
1375 flushed_sequence: None,
1376 committed_sequence: None,
1377 };
1378 assert!(!is_valid_region_edit(&edit));
1379 let edit = RegionEdit {
1380 files_to_add: vec![FileMeta::default()],
1381 files_to_remove: vec![],
1382 timestamp_ms: None,
1383 compaction_time_window: None,
1384 flushed_entry_id: Some(1),
1385 flushed_sequence: None,
1386 committed_sequence: None,
1387 };
1388 assert!(!is_valid_region_edit(&edit));
1389 let edit = RegionEdit {
1390 files_to_add: vec![FileMeta::default()],
1391 files_to_remove: vec![],
1392 timestamp_ms: None,
1393 compaction_time_window: None,
1394 flushed_entry_id: None,
1395 flushed_sequence: Some(1),
1396 committed_sequence: None,
1397 };
1398 assert!(!is_valid_region_edit(&edit));
1399 }
1400}