1#[cfg(test)]
18mod alter_test;
19#[cfg(test)]
20mod append_mode_test;
21#[cfg(test)]
22mod basic_test;
23#[cfg(test)]
24mod batch_open_test;
25#[cfg(test)]
26mod catchup_test;
27#[cfg(test)]
28mod close_test;
29#[cfg(test)]
30mod compaction_test;
31#[cfg(test)]
32mod create_test;
33#[cfg(test)]
34mod drop_test;
35#[cfg(test)]
36mod edit_region_test;
37#[cfg(test)]
38mod filter_deleted_test;
39#[cfg(test)]
40mod flush_test;
41#[cfg(any(test, feature = "test"))]
42pub mod listener;
43#[cfg(test)]
44mod merge_mode_test;
45#[cfg(test)]
46mod open_test;
47#[cfg(test)]
48mod parallel_test;
49#[cfg(test)]
50mod projection_test;
51#[cfg(test)]
52mod prune_test;
53#[cfg(test)]
54mod row_selector_test;
55#[cfg(test)]
56mod scan_test;
57#[cfg(test)]
58mod set_role_state_test;
59#[cfg(test)]
60mod sync_test;
61#[cfg(test)]
62mod truncate_test;
63
64use std::any::Any;
65use std::collections::HashMap;
66use std::sync::Arc;
67use std::time::Instant;
68
69use api::region::RegionResponse;
70use async_trait::async_trait;
71use common_base::Plugins;
72use common_error::ext::BoxedError;
73use common_meta::key::SchemaMetadataManagerRef;
74use common_recordbatch::SendableRecordBatchStream;
75use common_telemetry::{info, tracing};
76use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
77use futures::future::{join_all, try_join_all};
78use object_store::manager::ObjectStoreManagerRef;
79use snafu::{ensure, OptionExt, ResultExt};
80use store_api::codec::PrimaryKeyEncoding;
81use store_api::logstore::provider::Provider;
82use store_api::logstore::LogStore;
83use store_api::manifest::ManifestVersion;
84use store_api::metadata::RegionMetadataRef;
85use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
86use store_api::region_engine::{
87 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
88 RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
89};
90use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
91use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
92use tokio::sync::{oneshot, Semaphore};
93
94use crate::cache::CacheStrategy;
95use crate::config::MitoConfig;
96use crate::error::{
97 InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
98 SerdeJsonSnafu,
99};
100use crate::manifest::action::RegionEdit;
101use crate::metrics::HANDLE_REQUEST_ELAPSED;
102use crate::read::scan_region::{ScanRegion, Scanner};
103use crate::request::{RegionEditRequest, WorkerRequest};
104use crate::wal::entry_distributor::{
105 build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
106};
107use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader};
108use crate::worker::WorkerGroup;
109
110pub const MITO_ENGINE_NAME: &str = "mito";
111
112#[derive(Clone)]
114pub struct MitoEngine {
115 inner: Arc<EngineInner>,
116}
117
118impl MitoEngine {
119 pub async fn new<S: LogStore>(
121 data_home: &str,
122 mut config: MitoConfig,
123 log_store: Arc<S>,
124 object_store_manager: ObjectStoreManagerRef,
125 schema_metadata_manager: SchemaMetadataManagerRef,
126 plugins: Plugins,
127 ) -> Result<MitoEngine> {
128 config.sanitize(data_home)?;
129
130 Ok(MitoEngine {
131 inner: Arc::new(
132 EngineInner::new(
133 config,
134 log_store,
135 object_store_manager,
136 schema_metadata_manager,
137 plugins,
138 )
139 .await?,
140 ),
141 })
142 }
143
144 pub fn is_region_exists(&self, region_id: RegionId) -> bool {
146 self.inner.workers.is_region_exists(region_id)
147 }
148
149 pub fn is_region_opening(&self, region_id: RegionId) -> bool {
151 self.inner.workers.is_region_opening(region_id)
152 }
153
154 pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
156 self.inner
157 .workers
158 .get_region(region_id)
159 .map(|region| region.region_statistic())
160 }
161
162 pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
164 self.inner
165 .workers
166 .get_region(region_id)
167 .map(|r| r.primary_key_encoding())
168 }
169
170 #[tracing::instrument(skip_all)]
175 pub async fn scan_to_stream(
176 &self,
177 region_id: RegionId,
178 request: ScanRequest,
179 ) -> Result<SendableRecordBatchStream, BoxedError> {
180 self.scanner(region_id, request)
181 .map_err(BoxedError::new)?
182 .scan()
183 .await
184 }
185
186 fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
188 self.scan_region(region_id, request)?.scanner()
189 }
190
191 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
193 self.inner.scan_region(region_id, request)
194 }
195
196 pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
201 let _timer = HANDLE_REQUEST_ELAPSED
202 .with_label_values(&["edit_region"])
203 .start_timer();
204
205 ensure!(
206 is_valid_region_edit(&edit),
207 InvalidRequestSnafu {
208 region_id,
209 reason: "invalid region edit"
210 }
211 );
212
213 let (tx, rx) = oneshot::channel();
214 let request = WorkerRequest::EditRegion(RegionEditRequest {
215 region_id,
216 edit,
217 tx,
218 });
219 self.inner
220 .workers
221 .submit_to_worker(region_id, request)
222 .await?;
223 rx.await.context(RecvSnafu)?
224 }
225
226 #[cfg(test)]
227 pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
228 self.inner.workers.get_region(id)
229 }
230
231 fn encode_manifest_info_to_extensions(
232 region_id: &RegionId,
233 manifest_info: RegionManifestInfo,
234 extensions: &mut HashMap<String, Vec<u8>>,
235 ) -> Result<()> {
236 let region_manifest_info = vec![(*region_id, manifest_info)];
237
238 extensions.insert(
239 MANIFEST_INFO_EXTENSION_KEY.to_string(),
240 RegionManifestInfo::encode_list(®ion_manifest_info).context(SerdeJsonSnafu)?,
241 );
242 info!(
243 "Added manifest info: {:?} to extensions, region_id: {:?}",
244 region_manifest_info, region_id
245 );
246 Ok(())
247 }
248}
249
250fn is_valid_region_edit(edit: &RegionEdit) -> bool {
252 !edit.files_to_add.is_empty()
253 && edit.files_to_remove.is_empty()
254 && matches!(
255 edit,
256 RegionEdit {
257 files_to_add: _,
258 files_to_remove: _,
259 compaction_time_window: None,
260 flushed_entry_id: None,
261 flushed_sequence: None,
262 }
263 )
264}
265
266struct EngineInner {
268 workers: WorkerGroup,
270 config: Arc<MitoConfig>,
272 wal_raw_entry_reader: Arc<dyn RawEntryReader>,
274}
275
276type TopicGroupedRegionOpenRequests = HashMap<String, Vec<(RegionId, RegionOpenRequest)>>;
277
278fn prepare_batch_open_requests(
280 requests: Vec<(RegionId, RegionOpenRequest)>,
281) -> Result<(
282 TopicGroupedRegionOpenRequests,
283 Vec<(RegionId, RegionOpenRequest)>,
284)> {
285 let mut topic_to_regions: HashMap<String, Vec<(RegionId, RegionOpenRequest)>> = HashMap::new();
286 let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new();
287 for (region_id, request) in requests {
288 let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) {
289 serde_json::from_str(options).context(SerdeJsonSnafu)?
290 } else {
291 WalOptions::RaftEngine
292 };
293 match options {
294 WalOptions::Kafka(options) => {
295 topic_to_regions
296 .entry(options.topic)
297 .or_default()
298 .push((region_id, request));
299 }
300 WalOptions::RaftEngine | WalOptions::Noop => {
301 remaining_regions.push((region_id, request));
302 }
303 }
304 }
305
306 Ok((topic_to_regions, remaining_regions))
307}
308
309impl EngineInner {
310 async fn new<S: LogStore>(
312 config: MitoConfig,
313 log_store: Arc<S>,
314 object_store_manager: ObjectStoreManagerRef,
315 schema_metadata_manager: SchemaMetadataManagerRef,
316 plugins: Plugins,
317 ) -> Result<EngineInner> {
318 let config = Arc::new(config);
319 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
320 Ok(EngineInner {
321 workers: WorkerGroup::start(
322 config.clone(),
323 log_store,
324 object_store_manager,
325 schema_metadata_manager,
326 plugins,
327 )
328 .await?,
329 config,
330 wal_raw_entry_reader,
331 })
332 }
333
334 async fn stop(&self) -> Result<()> {
336 self.workers.stop().await
337 }
338
339 fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
343 let region = self
345 .workers
346 .get_region(region_id)
347 .context(RegionNotFoundSnafu { region_id })?;
348 Ok(region.metadata())
349 }
350
351 async fn open_topic_regions(
352 &self,
353 topic: String,
354 region_requests: Vec<(RegionId, RegionOpenRequest)>,
355 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
356 let region_ids = region_requests
357 .iter()
358 .map(|(region_id, _)| *region_id)
359 .collect::<Vec<_>>();
360 let provider = Provider::kafka_provider(topic);
361 let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers(
362 provider,
363 self.wal_raw_entry_reader.clone(),
364 ®ion_ids,
365 DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
366 );
367
368 let mut responses = Vec::with_capacity(region_requests.len());
369 for ((region_id, request), entry_receiver) in
370 region_requests.into_iter().zip(entry_receivers)
371 {
372 let (request, receiver) =
373 WorkerRequest::new_open_region_request(region_id, request, Some(entry_receiver));
374 self.workers.submit_to_worker(region_id, request).await?;
375 responses.push(async move { receiver.await.context(RecvSnafu)? });
376 }
377
378 let distribution =
380 common_runtime::spawn_global(async move { distributor.distribute().await });
381 let responses = join_all(responses).await;
383
384 distribution.await.context(JoinSnafu)??;
385 Ok(region_ids.into_iter().zip(responses).collect())
386 }
387
388 async fn handle_batch_open_requests(
389 &self,
390 parallelism: usize,
391 requests: Vec<(RegionId, RegionOpenRequest)>,
392 ) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
393 let semaphore = Arc::new(Semaphore::new(parallelism));
394 let (topic_to_region_requests, remaining_region_requests) =
395 prepare_batch_open_requests(requests)?;
396 let mut responses =
397 Vec::with_capacity(topic_to_region_requests.len() + remaining_region_requests.len());
398
399 if !topic_to_region_requests.is_empty() {
400 let mut tasks = Vec::with_capacity(topic_to_region_requests.len());
401 for (topic, region_requests) in topic_to_region_requests {
402 let semaphore_moved = semaphore.clone();
403 tasks.push(async move {
404 let _permit = semaphore_moved.acquire().await.unwrap();
406 self.open_topic_regions(topic, region_requests).await
407 })
408 }
409 let r = try_join_all(tasks).await?;
410 responses.extend(r.into_iter().flatten());
411 }
412
413 if !remaining_region_requests.is_empty() {
414 let mut tasks = Vec::with_capacity(remaining_region_requests.len());
415 let mut region_ids = Vec::with_capacity(remaining_region_requests.len());
416 for (region_id, request) in remaining_region_requests {
417 let semaphore_moved = semaphore.clone();
418 region_ids.push(region_id);
419 tasks.push(async move {
420 let _permit = semaphore_moved.acquire().await.unwrap();
422 let (request, receiver) =
423 WorkerRequest::new_open_region_request(region_id, request, None);
424
425 self.workers.submit_to_worker(region_id, request).await?;
426
427 receiver.await.context(RecvSnafu)?
428 })
429 }
430
431 let results = join_all(tasks).await;
432 responses.extend(region_ids.into_iter().zip(results));
433 }
434
435 Ok(responses)
436 }
437
438 async fn handle_request(
440 &self,
441 region_id: RegionId,
442 request: RegionRequest,
443 ) -> Result<AffectedRows> {
444 let region_metadata = self.get_metadata(region_id).ok();
445 let (request, receiver) =
446 WorkerRequest::try_from_region_request(region_id, request, region_metadata)?;
447 self.workers.submit_to_worker(region_id, request).await?;
448
449 receiver.await.context(RecvSnafu)?
450 }
451
452 fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
453 let region = self
455 .workers
456 .get_region(region_id)
457 .context(RegionNotFoundSnafu { region_id })?;
458 let version_ctrl = ®ion.version_control;
459 let seq = Some(version_ctrl.committed_sequence());
460 Ok(seq)
461 }
462
463 fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
465 let query_start = Instant::now();
466 let region = self
468 .workers
469 .get_region(region_id)
470 .context(RegionNotFoundSnafu { region_id })?;
471 let version = region.version();
472 let cache_manager = self.workers.cache_manager();
474
475 let scan_region = ScanRegion::new(
476 version,
477 region.access_layer.clone(),
478 request,
479 CacheStrategy::EnableAll(cache_manager),
480 )
481 .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
482 .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
483 .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
484 .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
485 .with_start_time(query_start);
486
487 Ok(scan_region)
488 }
489
490 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
492 let region = self
493 .workers
494 .get_region(region_id)
495 .context(RegionNotFoundSnafu { region_id })?;
496
497 region.set_role(role);
498 Ok(())
499 }
500
501 async fn set_region_role_state_gracefully(
503 &self,
504 region_id: RegionId,
505 region_role_state: SettableRegionRoleState,
506 ) -> Result<SetRegionRoleStateResponse> {
507 let (request, receiver) =
510 WorkerRequest::new_set_readonly_gracefully(region_id, region_role_state);
511 self.workers.submit_to_worker(region_id, request).await?;
512
513 receiver.await.context(RecvSnafu)
514 }
515
516 async fn sync_region(
517 &self,
518 region_id: RegionId,
519 manifest_info: RegionManifestInfo,
520 ) -> Result<(ManifestVersion, bool)> {
521 ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
522 let manifest_version = manifest_info.data_manifest_version();
523 let (request, receiver) =
524 WorkerRequest::new_sync_region_request(region_id, manifest_version);
525 self.workers.submit_to_worker(region_id, request).await?;
526
527 receiver.await.context(RecvSnafu)?
528 }
529
530 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
531 self.workers.get_region(region_id).map(|region| {
532 if region.is_follower() {
533 RegionRole::Follower
534 } else {
535 RegionRole::Leader
536 }
537 })
538 }
539}
540
541#[async_trait]
542impl RegionEngine for MitoEngine {
543 fn name(&self) -> &str {
544 MITO_ENGINE_NAME
545 }
546
547 #[tracing::instrument(skip_all)]
548 async fn handle_batch_open_requests(
549 &self,
550 parallelism: usize,
551 requests: Vec<(RegionId, RegionOpenRequest)>,
552 ) -> Result<BatchResponses, BoxedError> {
553 self.inner
555 .handle_batch_open_requests(parallelism, requests)
556 .await
557 .map(|responses| {
558 responses
559 .into_iter()
560 .map(|(region_id, response)| {
561 (
562 region_id,
563 response.map(RegionResponse::new).map_err(BoxedError::new),
564 )
565 })
566 .collect::<Vec<_>>()
567 })
568 .map_err(BoxedError::new)
569 }
570
571 #[tracing::instrument(skip_all)]
572 async fn handle_request(
573 &self,
574 region_id: RegionId,
575 request: RegionRequest,
576 ) -> Result<RegionResponse, BoxedError> {
577 let _timer = HANDLE_REQUEST_ELAPSED
578 .with_label_values(&[request.request_type()])
579 .start_timer();
580
581 let is_alter = matches!(request, RegionRequest::Alter(_));
582 let mut response = self
583 .inner
584 .handle_request(region_id, request)
585 .await
586 .map(RegionResponse::new)
587 .map_err(BoxedError::new)?;
588
589 if is_alter {
590 if let Some(statistic) = self.region_statistic(region_id) {
591 Self::encode_manifest_info_to_extensions(
592 ®ion_id,
593 statistic.manifest,
594 &mut response.extensions,
595 )
596 .map_err(BoxedError::new)?;
597 }
598 }
599
600 Ok(response)
601 }
602
603 #[tracing::instrument(skip_all)]
604 async fn handle_query(
605 &self,
606 region_id: RegionId,
607 request: ScanRequest,
608 ) -> Result<RegionScannerRef, BoxedError> {
609 self.scan_region(region_id, request)
610 .map_err(BoxedError::new)?
611 .region_scanner()
612 .map_err(BoxedError::new)
613 }
614
615 async fn get_last_seq_num(
616 &self,
617 region_id: RegionId,
618 ) -> Result<Option<SequenceNumber>, BoxedError> {
619 self.inner
620 .get_last_seq_num(region_id)
621 .map_err(BoxedError::new)
622 }
623
624 async fn get_metadata(
626 &self,
627 region_id: RegionId,
628 ) -> std::result::Result<RegionMetadataRef, BoxedError> {
629 self.inner.get_metadata(region_id).map_err(BoxedError::new)
630 }
631
632 async fn stop(&self) -> std::result::Result<(), BoxedError> {
638 self.inner.stop().await.map_err(BoxedError::new)
639 }
640
641 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
642 self.get_region_statistic(region_id)
643 }
644
645 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
646 self.inner
647 .set_region_role(region_id, role)
648 .map_err(BoxedError::new)
649 }
650
651 async fn set_region_role_state_gracefully(
652 &self,
653 region_id: RegionId,
654 region_role_state: SettableRegionRoleState,
655 ) -> Result<SetRegionRoleStateResponse, BoxedError> {
656 let _timer = HANDLE_REQUEST_ELAPSED
657 .with_label_values(&["set_region_role_state_gracefully"])
658 .start_timer();
659
660 self.inner
661 .set_region_role_state_gracefully(region_id, region_role_state)
662 .await
663 .map_err(BoxedError::new)
664 }
665
666 async fn sync_region(
667 &self,
668 region_id: RegionId,
669 manifest_info: RegionManifestInfo,
670 ) -> Result<SyncManifestResponse, BoxedError> {
671 let (_, synced) = self
672 .inner
673 .sync_region(region_id, manifest_info)
674 .await
675 .map_err(BoxedError::new)?;
676
677 Ok(SyncManifestResponse::Mito { synced })
678 }
679
680 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
681 self.inner.role(region_id)
682 }
683
684 fn as_any(&self) -> &dyn Any {
685 self
686 }
687}
688
689#[cfg(any(test, feature = "test"))]
691#[allow(clippy::too_many_arguments)]
692impl MitoEngine {
693 pub async fn new_for_test<S: LogStore>(
695 data_home: &str,
696 mut config: MitoConfig,
697 log_store: Arc<S>,
698 object_store_manager: ObjectStoreManagerRef,
699 write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
700 listener: Option<crate::engine::listener::EventListenerRef>,
701 time_provider: crate::time_provider::TimeProviderRef,
702 schema_metadata_manager: SchemaMetadataManagerRef,
703 ) -> Result<MitoEngine> {
704 config.sanitize(data_home)?;
705
706 let config = Arc::new(config);
707 let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
708 Ok(MitoEngine {
709 inner: Arc::new(EngineInner {
710 workers: WorkerGroup::start_for_test(
711 config.clone(),
712 log_store,
713 object_store_manager,
714 write_buffer_manager,
715 listener,
716 schema_metadata_manager,
717 time_provider,
718 )
719 .await?,
720 config,
721 wal_raw_entry_reader,
722 }),
723 })
724 }
725
726 pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
728 self.inner.workers.purge_scheduler()
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use std::time::Duration;
735
736 use super::*;
737 use crate::sst::file::FileMeta;
738
739 #[test]
740 fn test_is_valid_region_edit() {
741 let edit = RegionEdit {
743 files_to_add: vec![FileMeta::default()],
744 files_to_remove: vec![],
745 compaction_time_window: None,
746 flushed_entry_id: None,
747 flushed_sequence: None,
748 };
749 assert!(is_valid_region_edit(&edit));
750
751 let edit = RegionEdit {
753 files_to_add: vec![],
754 files_to_remove: vec![],
755 compaction_time_window: None,
756 flushed_entry_id: None,
757 flushed_sequence: None,
758 };
759 assert!(!is_valid_region_edit(&edit));
760
761 let edit = RegionEdit {
763 files_to_add: vec![FileMeta::default()],
764 files_to_remove: vec![FileMeta::default()],
765 compaction_time_window: None,
766 flushed_entry_id: None,
767 flushed_sequence: None,
768 };
769 assert!(!is_valid_region_edit(&edit));
770
771 let edit = RegionEdit {
773 files_to_add: vec![FileMeta::default()],
774 files_to_remove: vec![],
775 compaction_time_window: Some(Duration::from_secs(1)),
776 flushed_entry_id: None,
777 flushed_sequence: None,
778 };
779 assert!(!is_valid_region_edit(&edit));
780 let edit = RegionEdit {
781 files_to_add: vec![FileMeta::default()],
782 files_to_remove: vec![],
783 compaction_time_window: None,
784 flushed_entry_id: Some(1),
785 flushed_sequence: None,
786 };
787 assert!(!is_valid_region_edit(&edit));
788 let edit = RegionEdit {
789 files_to_add: vec![FileMeta::default()],
790 files_to_remove: vec![],
791 compaction_time_window: None,
792 flushed_entry_id: None,
793 flushed_sequence: Some(1),
794 };
795 assert!(!is_valid_region_edit(&edit));
796 }
797}