1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod set_role_state_test;
57#[cfg(test)]
58mod sync_test;
59#[cfg(test)]
60mod truncate_test;
61
62use std::any::Any;
63use std::collections::HashMap;
64use std::sync::Arc;
65use std::time::Instant;
66
67use api::region::RegionResponse;
68use async_trait::async_trait;
69use common_base::Plugins;
70use common_error::ext::BoxedError;
71use common_meta::key::SchemaMetadataManagerRef;
72use common_recordbatch::SendableRecordBatchStream;
73use common_telemetry::{info, tracing};
74use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
75use futures::future::{join_all, try_join_all};
76use object_store::manager::ObjectStoreManagerRef;
77use snafu::{ensure, OptionExt, ResultExt};
78use store_api::codec::PrimaryKeyEncoding;
79use store_api::logstore::provider::Provider;
80use store_api::logstore::LogStore;
81use store_api::manifest::ManifestVersion;
82use store_api::metadata::RegionMetadataRef;
83use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
84use store_api::region_engine::{
85 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
86 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
87};
88use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
89use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
90use tokio::sync::{oneshot, Semaphore};
91
92use crate::cache::CacheStrategy;
93use crate::config::MitoConfig;
94use crate::error::{
95 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
96 SerdeJsonSnafu,
97};
98use crate::manifest::action::RegionEdit;
99use crate::metrics::HANDLE_REQUEST_ELAPSED;
100use crate::read::scan_region::{ScanRegion, Scanner};
101use crate::request::{RegionEditRequest, WorkerRequest};
102use crate::wal::entry_distributor::{
103 build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
104};
105use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
106use crate::worker::WorkerGroup;
107
108pub const MITO_ENGINE_NAME: &str = "mito";
109
110#[derive(Clone)]
112pub struct MitoEngine {
113 inner: Arc<EngineInner>,
114}
115
116impl MitoEngine {
117 pub async fn new<S: LogStore>(
119 data_home: &str,
120 mut config: MitoConfig,
121 log_store: Arc<S>,
122 object_store_manager: ObjectStoreManagerRef,
123 schema_metadata_manager: SchemaMetadataManagerRef,
124 plugins: Plugins,
125 ) -> Result<MitoEngine> {
126 config.sanitize(data_home)?;
127
128 Ok(MitoEngine {
129 inner: Arc::new(
130 EngineInner::new(
131 config,
132 log_store,
133 object_store_manager,
134 schema_metadata_manager,
135 plugins,
136 )
137 .await?,
138 ),
139 })
140 }
141
142 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
144 self.inner.workers.is_region_exists(region_id)
145 }
146
147 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
149 self.inner.workers.is_region_opening(region_id)
150 }
151
152 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
154 self.inner
155 .workers
156 .get_region(region_id)
157 .map(|region| region.region_statistic())
158 }
159
160 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
162 self.inner
163 .workers
164 .get_region(region_id)
165 .map(|r| r.primary_key_encoding())
166 }
167
168 #[tracing::instrument(skip_all)]
173 pub async fn scan_to_stream(
174 &self,
175 region_id: RegionId,
176 request: ScanRequest,
177 ) -> Result<SendableRecordBatchStream, BoxedError> {
178 self.scanner(region_id, request)
179 .map_err(BoxedError::new)?
180 .scan()
181 .await
182 }
183
184 fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
186 self.scan_region(region_id, request)?.scanner()
187 }
188
189 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
191 self.inner.scan_region(region_id, request)
192 }
193
194 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
199 let _timer = HANDLE_REQUEST_ELAPSED
200 .with_label_values(&["edit_region"])
201 .start_timer();
202
203 ensure!(
204 is_valid_region_edit(&edit),
205 InvalidRequestSnafu {
206 region_id,
207 reason: "invalid region edit"
208 }
209 );
210
211 let (tx, rx) = oneshot::channel();
212 let request = WorkerRequest::EditRegion(RegionEditRequest {
213 region_id,
214 edit,
215 tx,
216 });
217 self.inner
218 .workers
219 .submit_to_worker(region_id, request)
220 .await?;
221 rx.await.context(RecvSnafu)?
222 }
223
224 #[cfg(test)]
225 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
226 self.inner.workers.get_region(id)
227 }
228
229 fn encode_manifest_info_to_extensions(
230 region_id: &RegionId,
231 manifest_info: RegionManifestInfo,
232 extensions: &mut HashMap<String, Vec<u8>>,
233 ) -> Result<()> {
234 let region_manifest_info = vec![(*region_id, manifest_info)];
235
236 extensions.insert(
237 MANIFEST_INFO_EXTENSION_KEY.to_string(),
238 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
239 );
240 info!(
241 "Added manifest info: {:?} to extensions, region_id: {:?}",
242 region_manifest_info, region_id
243 );
244 Ok(())
245 }
246}
247
248fn is_valid_region_edit(edit: &RegionEdit) -> bool {
250 !edit.files_to_add.is_empty()
251 && edit.files_to_remove.is_empty()
252 && matches!(
253 edit,
254 RegionEdit {
255 files_to_add: _,
256 files_to_remove: _,
257 compaction_time_window: None,
258 flushed_entry_id: None,
259 flushed_sequence: None,
260 }
261 )
262}
263
264struct EngineInner {
266 workers: WorkerGroup,
268 config: Arc<MitoConfig>,
270 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
272}
273
274type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
275
276fn prepare_batch_open_requests(
278 requests: Vec<(RegionId, RegionOpenRequest)>,
279) -> Result<(
280 TopicGroupedRegionOpenRequests,
281 Vec<(RegionId, RegionOpenRequest)>,
282)> {
283 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
284 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
285 for (region_id, request) in requests {
286 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
287 serde_json::from_str(options).context(SerdeJsonSnafu)?
288 } else {
289 WalOptions::RaftEngine
290 };
291 match options {
292 WalOptions::Kafka(options) => {
293 topic_to_regions
294 .entry(options.topic)
295 .or_default()
296 .push((region_id, request));
297 }
298 WalOptions::RaftEngine | WalOptions::Noop => {
299 remaining_regions.push((region_id, request));
300 }
301 }
302 }
303
304 Ok((topic_to_regions, remaining_regions))
305}
306
307impl EngineInner {
308 async fn new<S: LogStore>(
310 config: MitoConfig,
311 log_store: Arc<S>,
312 object_store_manager: ObjectStoreManagerRef,
313 schema_metadata_manager: SchemaMetadataManagerRef,
314 plugins: Plugins,
315 ) -> Result<EngineInner> {
316 let config = Arc::new(config);
317 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
318 Ok(EngineInner {
319 workers: WorkerGroup::start(
320 config.clone(),
321 log_store,
322 object_store_manager,
323 schema_metadata_manager,
324 plugins,
325 )
326 .await?,
327 config,
328 wal_raw_entry_reader,
329 })
330 }
331
332 async fn stop(&self) -> Result<()> {
334 self.workers.stop().await
335 }
336
337 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
341 let region = self
343 .workers
344 .get_region(region_id)
345 .context(RegionNotFoundSnafu { region_id })?;
346 Ok(region.metadata())
347 }
348
349 async fn open_topic_regions(
350 &self,
351 topic: String,
352 region_requests: Vec<(RegionId, RegionOpenRequest)>,
353 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
354 let region_ids = region_requests
355 .iter()
356 .map(|(region_id, _)| *region_id)
357 .collect::<Vec<_>>();
358 let provider = Provider::kafka_provider(topic);
359 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
360 provider,
361 self.wal_raw_entry_reader.clone(),
362 ®ion_ids,
363 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
364 );
365
366 let mut responses = Vec::with_capacity(region_requests.len());
367 for ((region_id, request), entry_receiver) in
368 region_requests.into_iter().zip(entry_receivers)
369 {
370 let (request, receiver) =
371 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
372 self.workers.submit_to_worker(region_id, request).await?;
373 responses.push(async move { receiver.await.context(RecvSnafu)? });
374 }
375
376 let distribution =
378 common_runtime::spawn_global(async move { distributor.distribute().await });
379 let responses = join_all(responses).await;
381
382 distribution.await.context(JoinSnafu)??;
383 Ok(region_ids.into_iter().zip(responses).collect())
384 }
385
386 async fn handle_batch_open_requests(
387 &self,
388 parallelism: usize,
389 requests: Vec<(RegionId, RegionOpenRequest)>,
390 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
391 let semaphore = Arc::new(Semaphore::new(parallelism));
392 let (topic_to_region_requests, remaining_region_requests) =
393 prepare_batch_open_requests(requests)?;
394 let mut responses =
395 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
396
397 if !topic_to_region_requests.is_empty() {
398 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
399 for (topic, region_requests) in topic_to_region_requests {
400 let semaphore_moved = semaphore.clone();
401 tasks.push(async move {
402 let _permit = semaphore_moved.acquire().await.unwrap();
404 self.open_topic_regions(topic, region_requests).await
405 })
406 }
407 let r = try_join_all(tasks).await?;
408 responses.extend(r.into_iter().flatten());
409 }
410
411 if !remaining_region_requests.is_empty() {
412 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
413 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
414 for (region_id, request) in remaining_region_requests {
415 let semaphore_moved = semaphore.clone();
416 region_ids.push(region_id);
417 tasks.push(async move {
418 let _permit = semaphore_moved.acquire().await.unwrap();
420 let (request, receiver) =
421 WorkerRequest::new_open_region_request(region_id, request, None);
422
423 self.workers.submit_to_worker(region_id, request).await?;
424
425 receiver.await.context(RecvSnafu)?
426 })
427 }
428
429 let results = join_all(tasks).await;
430 responses.extend(region_ids.into_iter().zip(results));
431 }
432
433 Ok(responses)
434 }
435
436 async fn handle_request(
438 &self,
439 region_id: RegionId,
440 request: RegionRequest,
441 ) -> Result<AffectedRows> {
442 let region_metadata = self.get_metadata(region_id).ok();
443 let (request, receiver) =
444 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
445 self.workers.submit_to_worker(region_id, request).await?;
446
447 receiver.await.context(RecvSnafu)?
448 }
449
450 fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
451 let region = self
453 .workers
454 .get_region(region_id)
455 .context(RegionNotFoundSnafu { region_id })?;
456 let version_ctrl = ®ion.version_control;
457 let seq = Some(version_ctrl.committed_sequence());
458 Ok(seq)
459 }
460
461 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
463 let query_start = Instant::now();
464 let region = self
466 .workers
467 .get_region(region_id)
468 .context(RegionNotFoundSnafu { region_id })?;
469 let version = region.version();
470 let cache_manager = self.workers.cache_manager();
472
473 let scan_region = ScanRegion::new(
474 version,
475 region.access_layer.clone(),
476 request,
477 CacheStrategy::EnableAll(cache_manager),
478 )
479 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
480 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
481 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
482 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
483 .with_start_time(query_start);
484
485 Ok(scan_region)
486 }
487
488 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
490 let region = self
491 .workers
492 .get_region(region_id)
493 .context(RegionNotFoundSnafu { region_id })?;
494
495 region.set_role(role);
496 Ok(())
497 }
498
499 async fn set_region_role_state_gracefully(
501 &self,
502 region_id: RegionId,
503 region_role_state: SettableRegionRoleState,
504 ) -> Result<SetRegionRoleStateResponse> {
505 let (request, receiver) =
508 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
509 self.workers.submit_to_worker(region_id, request).await?;
510
511 receiver.await.context(RecvSnafu)
512 }
513
514 async fn sync_region(
515 &self,
516 region_id: RegionId,
517 manifest_info: RegionManifestInfo,
518 ) -> Result<(ManifestVersion, bool)> {
519 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
520 let manifest_version = manifest_info.data_manifest_version();
521 let (request, receiver) =
522 WorkerRequest::new_sync_region_request(region_id, manifest_version);
523 self.workers.submit_to_worker(region_id, request).await?;
524
525 receiver.await.context(RecvSnafu)?
526 }
527
528 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
529 self.workers.get_region(region_id).map(|region| {
530 if region.is_follower() {
531 RegionRole::Follower
532 } else {
533 RegionRole::Leader
534 }
535 })
536 }
537}
538
539#[async_trait]
540impl RegionEngine for MitoEngine {
541 fn name(&self) -> &str {
542 MITO_ENGINE_NAME
543 }
544
545 #[tracing::instrument(skip_all)]
546 async fn handle_batch_open_requests(
547 &self,
548 parallelism: usize,
549 requests: Vec<(RegionId, RegionOpenRequest)>,
550 ) -> Result<BatchResponses, BoxedError> {
551 self.inner
553 .handle_batch_open_requests(parallelism, requests)
554 .await
555 .map(|responses| {
556 responses
557 .into_iter()
558 .map(|(region_id, response)| {
559 (
560 region_id,
561 response.map(RegionResponse::new).map_err(BoxedError::new),
562 )
563 })
564 .collect::<Vec<_>>()
565 })
566 .map_err(BoxedError::new)
567 }
568
569 #[tracing::instrument(skip_all)]
570 async fn handle_request(
571 &self,
572 region_id: RegionId,
573 request: RegionRequest,
574 ) -> Result<RegionResponse, BoxedError> {
575 let _timer = HANDLE_REQUEST_ELAPSED
576 .with_label_values(&[request.request_type()])
577 .start_timer();
578
579 let is_alter = matches!(request, RegionRequest::Alter(_));
580 let mut response = self
581 .inner
582 .handle_request(region_id, request)
583 .await
584 .map(RegionResponse::new)
585 .map_err(BoxedError::new)?;
586
587 if is_alter {
588 if let Some(statistic) = self.region_statistic(region_id) {
589 Self::encode_manifest_info_to_extensions(
590 ®ion_id,
591 statistic.manifest,
592 &mut response.extensions,
593 )
594 .map_err(BoxedError::new)?;
595 }
596 }
597
598 Ok(response)
599 }
600
601 #[tracing::instrument(skip_all)]
602 async fn handle_query(
603 &self,
604 region_id: RegionId,
605 request: ScanRequest,
606 ) -> Result<RegionScannerRef, BoxedError> {
607 self.scan_region(region_id, request)
608 .map_err(BoxedError::new)?
609 .region_scanner()
610 .map_err(BoxedError::new)
611 }
612
613 async fn get_last_seq_num(
614 &self,
615 region_id: RegionId,
616 ) -> Result<Option<SequenceNumber>, BoxedError> {
617 self.inner
618 .get_last_seq_num(region_id)
619 .map_err(BoxedError::new)
620 }
621
622 async fn get_metadata(
624 &self,
625 region_id: RegionId,
626 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
627 self.inner.get_metadata(region_id).map_err(BoxedError::new)
628 }
629
630 async fn stop(&self) -> std::result::Result<(), BoxedError> {
636 self.inner.stop().await.map_err(BoxedError::new)
637 }
638
639 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
640 self.get_region_statistic(region_id)
641 }
642
643 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
644 self.inner
645 .set_region_role(region_id, role)
646 .map_err(BoxedError::new)
647 }
648
649 async fn set_region_role_state_gracefully(
650 &self,
651 region_id: RegionId,
652 region_role_state: SettableRegionRoleState,
653 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
654 let _timer = HANDLE_REQUEST_ELAPSED
655 .with_label_values(&["set_region_role_state_gracefully"])
656 .start_timer();
657
658 self.inner
659 .set_region_role_state_gracefully(region_id, region_role_state)
660 .await
661 .map_err(BoxedError::new)
662 }
663
664 async fn sync_region(
665 &self,
666 region_id: RegionId,
667 manifest_info: RegionManifestInfo,
668 ) -> Result<SyncManifestResponse, BoxedError> {
669 let (_, synced) = self
670 .inner
671 .sync_region(region_id, manifest_info)
672 .await
673 .map_err(BoxedError::new)?;
674
675 Ok(SyncManifestResponse::Mito { synced })
676 }
677
678 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
679 self.inner.role(region_id)
680 }
681
682 fn as_any(&self) -> &dyn Any {
683 self
684 }
685}
686
687#[cfg(any(test, feature = "test"))]
689#[allow(clippy::too_many_arguments)]
690impl MitoEngine {
691 pub async fn new_for_test<S: LogStore>(
693 data_home: &str,
694 mut config: MitoConfig,
695 log_store: Arc<S>,
696 object_store_manager: ObjectStoreManagerRef,
697 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
698 listener: Option<crate::engine::listener::EventListenerRef>,
699 time_provider: crate::time_provider::TimeProviderRef,
700 schema_metadata_manager: SchemaMetadataManagerRef,
701 ) -> Result<MitoEngine> {
702 config.sanitize(data_home)?;
703
704 let config = Arc::new(config);
705 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
706 Ok(MitoEngine {
707 inner: Arc::new(EngineInner {
708 workers: WorkerGroup::start_for_test(
709 config.clone(),
710 log_store,
711 object_store_manager,
712 write_buffer_manager,
713 listener,
714 schema_metadata_manager,
715 time_provider,
716 )
717 .await?,
718 config,
719 wal_raw_entry_reader,
720 }),
721 })
722 }
723
724 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
726 self.inner.workers.purge_scheduler()
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use std::time::Duration;
733
734 use super::*;
735 use crate::sst::file::FileMeta;
736
737 #[test]
738 fn test_is_valid_region_edit() {
739 let edit = RegionEdit {
741 files_to_add: vec![FileMeta::default()],
742 files_to_remove: vec![],
743 compaction_time_window: None,
744 flushed_entry_id: None,
745 flushed_sequence: None,
746 };
747 assert!(is_valid_region_edit(&edit));
748
749 let edit = RegionEdit {
751 files_to_add: vec![],
752 files_to_remove: vec![],
753 compaction_time_window: None,
754 flushed_entry_id: None,
755 flushed_sequence: None,
756 };
757 assert!(!is_valid_region_edit(&edit));
758
759 let edit = RegionEdit {
761 files_to_add: vec![FileMeta::default()],
762 files_to_remove: vec![FileMeta::default()],
763 compaction_time_window: None,
764 flushed_entry_id: None,
765 flushed_sequence: None,
766 };
767 assert!(!is_valid_region_edit(&edit));
768
769 let edit = RegionEdit {
771 files_to_add: vec![FileMeta::default()],
772 files_to_remove: vec![],
773 compaction_time_window: Some(Duration::from_secs(1)),
774 flushed_entry_id: None,
775 flushed_sequence: None,
776 };
777 assert!(!is_valid_region_edit(&edit));
778 let edit = RegionEdit {
779 files_to_add: vec![FileMeta::default()],
780 files_to_remove: vec![],
781 compaction_time_window: None,
782 flushed_entry_id: Some(1),
783 flushed_sequence: None,
784 };
785 assert!(!is_valid_region_edit(&edit));
786 let edit = RegionEdit {
787 files_to_add: vec![FileMeta::default()],
788 files_to_remove: vec![],
789 compaction_time_window: None,
790 flushed_entry_id: None,
791 flushed_sequence: Some(1),
792 };
793 assert!(!is_valid_region_edit(&edit));
794 }
795}