1mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod staging;
27mod state;
28mod sync;
29
30use std::any::Any;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33
34use api::region::RegionResponse;
35use async_trait::async_trait;
36use common_error::ext::{BoxedError, ErrorExt};
37use common_error::status_code::StatusCode;
38use common_runtime::RepeatedTask;
39use mito2::engine::MitoEngine;
40pub(crate) use options::IndexOptions;
41use snafu::{OptionExt, ResultExt};
42pub(crate) use state::MetricEngineState;
43use store_api::metadata::RegionMetadataRef;
44use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
45use store_api::region_engine::{
46 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
47 RegionStatistic, RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
48 SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse,
49};
50use store_api::region_request::{
51 BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
52};
53use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
54
55use crate::config::EngineConfig;
56use crate::data_region::DataRegion;
57use crate::error::{
58 self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu,
59 UnsupportedRemapManifestsRequestSnafu,
60};
61use crate::metadata_region::MetadataRegion;
62use crate::repeated_task::FlushMetadataRegionTask;
63use crate::row_modifier::RowModifier;
64use crate::utils::{self, get_region_statistic};
65
66#[cfg_attr(doc, aquamarine::aquamarine)]
67#[derive(Clone)]
129pub struct MetricEngine {
130 inner: Arc<MetricEngineInner>,
131}
132
133#[async_trait]
134impl RegionEngine for MetricEngine {
135 fn name(&self) -> &str {
137 METRIC_ENGINE_NAME
138 }
139
140 async fn handle_batch_open_requests(
141 &self,
142 parallelism: usize,
143 requests: Vec<(RegionId, RegionOpenRequest)>,
144 ) -> Result<BatchResponses, BoxedError> {
145 self.inner
146 .handle_batch_open_requests(parallelism, requests)
147 .await
148 .map_err(BoxedError::new)
149 }
150
151 async fn handle_batch_catchup_requests(
152 &self,
153 parallelism: usize,
154 requests: Vec<(RegionId, RegionCatchupRequest)>,
155 ) -> Result<BatchResponses, BoxedError> {
156 self.inner
157 .handle_batch_catchup_requests(parallelism, requests)
158 .await
159 .map_err(BoxedError::new)
160 }
161
162 async fn handle_batch_ddl_requests(
163 &self,
164 batch_request: BatchRegionDdlRequest,
165 ) -> Result<RegionResponse, BoxedError> {
166 match batch_request {
167 BatchRegionDdlRequest::Create(requests) => {
168 let mut extension_return_value = HashMap::new();
169 let rows = self
170 .inner
171 .create_regions(requests, &mut extension_return_value)
172 .await
173 .map_err(BoxedError::new)?;
174
175 Ok(RegionResponse {
176 affected_rows: rows,
177 extensions: extension_return_value,
178 metadata: Vec::new(),
179 })
180 }
181 BatchRegionDdlRequest::Alter(requests) => {
182 let mut extension_return_value = HashMap::new();
183 let rows = self
184 .inner
185 .alter_regions(requests, &mut extension_return_value)
186 .await
187 .map_err(BoxedError::new)?;
188
189 Ok(RegionResponse {
190 affected_rows: rows,
191 extensions: extension_return_value,
192 metadata: Vec::new(),
193 })
194 }
195 BatchRegionDdlRequest::Drop(requests) => {
196 self.handle_requests(
197 requests
198 .into_iter()
199 .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
200 )
201 .await
202 }
203 }
204 }
205
206 async fn handle_request(
208 &self,
209 region_id: RegionId,
210 request: RegionRequest,
211 ) -> Result<RegionResponse, BoxedError> {
212 let mut extension_return_value = HashMap::new();
213
214 let result = match request {
215 RegionRequest::EnterStaging(_) => {
216 if self.inner.is_physical_region(region_id) {
217 self.handle_enter_staging_request(region_id, request).await
218 } else {
219 UnsupportedRegionRequestSnafu { request }.fail()
220 }
221 }
222 RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
223 RegionRequest::Create(create) => {
224 self.inner
225 .create_regions(vec![(region_id, create)], &mut extension_return_value)
226 .await
227 }
228 RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
229 RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
230 RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
231 RegionRequest::Alter(alter) => {
232 self.inner
233 .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
234 .await
235 }
236 RegionRequest::Compact(_) => {
237 if self.inner.is_physical_region(region_id) {
238 self.inner
239 .mito
240 .handle_request(region_id, request)
241 .await
242 .context(error::MitoFlushOperationSnafu)
243 .map(|response| response.affected_rows)
244 } else {
245 UnsupportedRegionRequestSnafu { request }.fail()
246 }
247 }
248 RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
249 RegionRequest::BuildIndex(_) => {
250 if self.inner.is_physical_region(region_id) {
251 self.inner
252 .mito
253 .handle_request(region_id, request)
254 .await
255 .context(error::MitoFlushOperationSnafu)
256 .map(|response| response.affected_rows)
257 } else {
258 UnsupportedRegionRequestSnafu { request }.fail()
259 }
260 }
261 RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
262 RegionRequest::Delete(delete) => self.inner.delete_region(region_id, delete).await,
263 RegionRequest::Catchup(_) => {
264 let mut response = self
265 .inner
266 .handle_batch_catchup_requests(
267 1,
268 vec![(region_id, RegionCatchupRequest::default())],
269 )
270 .await
271 .map_err(BoxedError::new)?;
272 debug_assert_eq!(response.len(), 1);
273 let (resp_region_id, response) = response
274 .pop()
275 .context(error::UnexpectedRequestSnafu {
276 reason: "expected 1 response, but got zero responses",
277 })
278 .map_err(BoxedError::new)?;
279 debug_assert_eq!(region_id, resp_region_id);
280 return response;
281 }
282 RegionRequest::BulkInserts(_) => {
283 UnsupportedRegionRequestSnafu { request }.fail()
285 }
286 };
287
288 result.map_err(BoxedError::new).map(|rows| RegionResponse {
289 affected_rows: rows,
290 extensions: extension_return_value,
291 metadata: Vec::new(),
292 })
293 }
294
295 async fn handle_query(
296 &self,
297 region_id: RegionId,
298 request: ScanRequest,
299 ) -> Result<RegionScannerRef, BoxedError> {
300 self.handle_query(region_id, request).await
301 }
302
303 async fn get_committed_sequence(
304 &self,
305 region_id: RegionId,
306 ) -> Result<SequenceNumber, BoxedError> {
307 self.inner
308 .get_last_seq_num(region_id)
309 .await
310 .map_err(BoxedError::new)
311 }
312
313 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
315 self.inner
316 .load_region_metadata(region_id)
317 .await
318 .map_err(BoxedError::new)
319 }
320
321 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
325 if self.inner.is_physical_region(region_id) {
326 get_region_statistic(&self.inner.mito, region_id)
327 } else {
328 None
329 }
330 }
331
332 async fn stop(&self) -> Result<(), BoxedError> {
334 Ok(())
336 }
337
338 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
339 for x in [
341 utils::to_metadata_region_id(region_id),
342 utils::to_data_region_id(region_id),
343 ] {
344 if let Err(e) = self.inner.mito.set_region_role(x, role)
345 && e.status_code() != StatusCode::RegionNotFound
346 {
347 return Err(e);
348 }
349 }
350 Ok(())
351 }
352
353 async fn sync_region(
354 &self,
355 region_id: RegionId,
356 manifest_info: RegionManifestInfo,
357 ) -> Result<SyncManifestResponse, BoxedError> {
358 self.inner
359 .sync_region(region_id, manifest_info)
360 .await
361 .map_err(BoxedError::new)
362 }
363
364 async fn remap_manifests(
365 &self,
366 request: RemapManifestsRequest,
367 ) -> Result<RemapManifestsResponse, BoxedError> {
368 let region_id = request.region_id;
369 if self.inner.is_physical_region(region_id) {
370 self.inner.mito.remap_manifests(request).await
371 } else {
372 Err(BoxedError::new(
373 UnsupportedRemapManifestsRequestSnafu { region_id }.build(),
374 ))
375 }
376 }
377
378 async fn set_region_role_state_gracefully(
379 &self,
380 region_id: RegionId,
381 region_role_state: SettableRegionRoleState,
382 ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
383 let metadata_result = match self
384 .inner
385 .mito
386 .set_region_role_state_gracefully(
387 utils::to_metadata_region_id(region_id),
388 region_role_state,
389 )
390 .await?
391 {
392 SetRegionRoleStateResponse::Success(success) => success,
393 SetRegionRoleStateResponse::NotFound => {
394 return Ok(SetRegionRoleStateResponse::NotFound);
395 }
396 SetRegionRoleStateResponse::InvalidTransition(error) => {
397 return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
398 }
399 };
400
401 let data_result = match self
402 .inner
403 .mito
404 .set_region_role_state_gracefully(region_id, region_role_state)
405 .await?
406 {
407 SetRegionRoleStateResponse::Success(success) => success,
408 SetRegionRoleStateResponse::NotFound => {
409 return Ok(SetRegionRoleStateResponse::NotFound);
410 }
411 SetRegionRoleStateResponse::InvalidTransition(error) => {
412 return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
413 }
414 };
415
416 Ok(SetRegionRoleStateResponse::success(
417 SetRegionRoleStateSuccess::metric(
418 data_result.last_entry_id().unwrap_or_default(),
419 metadata_result.last_entry_id().unwrap_or_default(),
420 ),
421 ))
422 }
423
424 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
428 if self.inner.is_physical_region(region_id) {
429 self.inner.mito.role(region_id)
430 } else {
431 None
432 }
433 }
434
435 fn as_any(&self) -> &dyn Any {
436 self
437 }
438}
439
440impl MetricEngine {
441 pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
442 let metadata_region = MetadataRegion::new(mito.clone());
443 let data_region = DataRegion::new(mito.clone());
444 let state = Arc::new(RwLock::default());
445 config.sanitize();
446 let flush_interval = config.flush_metadata_region_interval;
447 let inner = Arc::new(MetricEngineInner {
448 mito: mito.clone(),
449 metadata_region,
450 data_region,
451 state: state.clone(),
452 config,
453 row_modifier: RowModifier::default(),
454 flush_task: RepeatedTask::new(
455 flush_interval,
456 Box::new(FlushMetadataRegionTask {
457 state: state.clone(),
458 mito: mito.clone(),
459 }),
460 ),
461 });
462 inner
463 .flush_task
464 .start(common_runtime::global_runtime())
465 .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
466 Ok(Self { inner })
467 }
468
469 pub fn mito(&self) -> MitoEngine {
470 self.inner.mito.clone()
471 }
472
473 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
475 self.inner
476 .metadata_region
477 .logical_regions(physical_region_id)
478 .await
479 }
480
481 async fn handle_query(
483 &self,
484 region_id: RegionId,
485 request: ScanRequest,
486 ) -> Result<RegionScannerRef, BoxedError> {
487 self.inner
488 .read_region(region_id, request)
489 .await
490 .map_err(BoxedError::new)
491 }
492
493 async fn handle_requests(
494 &self,
495 requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
496 ) -> Result<RegionResponse, BoxedError> {
497 let mut affected_rows = 0;
498 let mut extensions = HashMap::new();
499 for (region_id, request) in requests {
500 let response = self.handle_request(region_id, request).await?;
501 affected_rows += response.affected_rows;
502 extensions.extend(response.extensions);
503 }
504
505 Ok(RegionResponse {
506 affected_rows,
507 extensions,
508 metadata: Vec::new(),
509 })
510 }
511}
512
513#[cfg(test)]
514impl MetricEngine {
515 pub async fn scan_to_stream(
516 &self,
517 region_id: RegionId,
518 request: ScanRequest,
519 ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
520 self.inner.scan_to_stream(region_id, request).await
521 }
522
523 pub fn config(&self) -> &EngineConfig {
525 &self.inner.config
526 }
527}
528
529struct MetricEngineInner {
530 mito: MitoEngine,
531 metadata_region: MetadataRegion,
532 data_region: DataRegion,
533 state: Arc<RwLock<MetricEngineState>>,
534 config: EngineConfig,
535 row_modifier: RowModifier,
536 flush_task: RepeatedTask<Error>,
537}
538
539#[cfg(test)]
540mod test {
541 use std::collections::HashMap;
542
543 use common_telemetry::info;
544 use common_wal::options::{KafkaWalOptions, WalOptions};
545 use mito2::sst::location::region_dir_from_table_dir;
546 use mito2::test_util::{kafka_log_store_factory, prepare_test_for_kafka_log_store};
547 use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
548 use store_api::mito_engine_options::WAL_OPTIONS_KEY;
549 use store_api::region_request::{
550 PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
551 };
552
553 use super::*;
554 use crate::maybe_skip_kafka_log_store_integration_test;
555 use crate::test_util::TestEnv;
556
557 #[tokio::test]
558 async fn close_open_regions() {
559 let env = TestEnv::new().await;
560 env.init_metric_region().await;
561 let engine = env.metric();
562
563 let physical_region_id = env.default_physical_region_id();
565 engine
566 .handle_request(
567 physical_region_id,
568 RegionRequest::Close(RegionCloseRequest {}),
569 )
570 .await
571 .unwrap();
572
573 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
575 .into_iter()
576 .collect();
577 let open_request = RegionOpenRequest {
578 engine: METRIC_ENGINE_NAME.to_string(),
579 table_dir: TestEnv::default_table_dir(),
580 path_type: PathType::Bare, options: physical_region_option,
582 skip_wal_replay: false,
583 checkpoint: None,
584 };
585 engine
586 .handle_request(physical_region_id, RegionRequest::Open(open_request))
587 .await
588 .unwrap();
589
590 let nonexistent_region_id = RegionId::new(12313, 12);
592 engine
593 .handle_request(
594 nonexistent_region_id,
595 RegionRequest::Close(RegionCloseRequest {}),
596 )
597 .await
598 .unwrap();
599
600 let invalid_open_request = RegionOpenRequest {
602 engine: METRIC_ENGINE_NAME.to_string(),
603 table_dir: TestEnv::default_table_dir(),
604 path_type: PathType::Bare, options: HashMap::new(),
606 skip_wal_replay: false,
607 checkpoint: None,
608 };
609 engine
610 .handle_request(
611 nonexistent_region_id,
612 RegionRequest::Open(invalid_open_request),
613 )
614 .await
615 .unwrap();
616 }
617
618 #[tokio::test]
619 async fn test_role() {
620 let env = TestEnv::new().await;
621 env.init_metric_region().await;
622
623 let logical_region_id = env.default_logical_region_id();
624 let physical_region_id = env.default_physical_region_id();
625
626 assert!(env.metric().role(logical_region_id).is_none());
627 assert!(env.metric().role(physical_region_id).is_some());
628 }
629
630 #[tokio::test]
631 async fn test_region_disk_usage() {
632 let env = TestEnv::new().await;
633 env.init_metric_region().await;
634
635 let logical_region_id = env.default_logical_region_id();
636 let physical_region_id = env.default_physical_region_id();
637
638 assert!(env.metric().region_statistic(logical_region_id).is_none());
639 assert!(env.metric().region_statistic(physical_region_id).is_some());
640 }
641
642 #[tokio::test]
643 async fn test_open_region_failure() {
644 let env = TestEnv::new().await;
645 env.init_metric_region().await;
646 let physical_region_id = env.default_physical_region_id();
647
648 let metric_engine = env.metric();
649 metric_engine
650 .handle_request(
651 physical_region_id,
652 RegionRequest::Flush(RegionFlushRequest {
653 row_group_size: None,
654 }),
655 )
656 .await
657 .unwrap();
658
659 let path = region_dir_from_table_dir(
660 &TestEnv::default_table_dir(),
661 physical_region_id,
662 PathType::Metadata,
663 );
664 let object_store = env.get_object_store().unwrap();
665 let list = object_store.list(&path).await.unwrap();
666 for entry in list {
668 if entry.metadata().is_dir() {
669 continue;
670 }
671 if entry.name().ends_with("parquet") {
672 info!("deleting {}", entry.path());
673 object_store.delete(entry.path()).await.unwrap();
674 }
675 }
676
677 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
678 .into_iter()
679 .collect();
680 let open_request = RegionOpenRequest {
681 engine: METRIC_ENGINE_NAME.to_string(),
682 table_dir: TestEnv::default_table_dir(),
683 path_type: PathType::Bare,
684 options: physical_region_option,
685 skip_wal_replay: false,
686 checkpoint: None,
687 };
688 metric_engine
691 .handle_request(physical_region_id, RegionRequest::Open(open_request))
692 .await
693 .unwrap();
694
695 metric_engine
697 .handle_request(
698 physical_region_id,
699 RegionRequest::Close(RegionCloseRequest {}),
700 )
701 .await
702 .unwrap();
703
704 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
706 .into_iter()
707 .collect();
708 let open_request = RegionOpenRequest {
709 engine: METRIC_ENGINE_NAME.to_string(),
710 table_dir: TestEnv::default_table_dir(),
711 path_type: PathType::Bare,
712 options: physical_region_option,
713 skip_wal_replay: false,
714 checkpoint: None,
715 };
716 let err = metric_engine
717 .handle_request(physical_region_id, RegionRequest::Open(open_request))
718 .await
719 .unwrap_err();
720 assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
722
723 let mito_engine = metric_engine.mito();
724 let data_region_id = utils::to_data_region_id(physical_region_id);
725 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
726 let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
728 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
729 let err = mito_engine
730 .get_metadata(metadata_region_id)
731 .await
732 .unwrap_err();
733 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
734 }
735
736 #[tokio::test]
737 async fn test_catchup_regions() {
738 common_telemetry::init_default_ut_logging();
739 maybe_skip_kafka_log_store_integration_test!();
740 let kafka_log_store_factory = kafka_log_store_factory().unwrap();
741 let mito_env = mito2::test_util::TestEnv::new()
742 .await
743 .with_log_store_factory(kafka_log_store_factory.clone());
744 let env = TestEnv::with_mito_env(mito_env).await;
745 let table_dir = |region_id| format!("table/{region_id}");
746 let mut physical_region_ids = vec![];
747 let mut logical_region_ids = vec![];
748
749 let num_topics = 3;
750 let num_physical_regions = 8;
751 let num_logical_regions = 16;
752 let parallelism = 2;
753 let mut topics = Vec::with_capacity(num_topics);
754 for _ in 0..num_topics {
755 let topic = prepare_test_for_kafka_log_store(&kafka_log_store_factory)
756 .await
757 .unwrap();
758 topics.push(topic);
759 }
760
761 let topic_idx = |id| (id as usize) % num_topics;
762 for i in 0..num_physical_regions {
764 let physical_region_id = RegionId::new(1, i);
765 physical_region_ids.push(physical_region_id);
766
767 let wal_options = WalOptions::Kafka(KafkaWalOptions {
768 topic: topics[topic_idx(i)].clone(),
769 });
770 env.create_physical_region(
771 physical_region_id,
772 &table_dir(physical_region_id),
773 vec![(
774 WAL_OPTIONS_KEY.to_string(),
775 serde_json::to_string(&wal_options).unwrap(),
776 )],
777 )
778 .await;
779 for j in 0..num_logical_regions {
781 let logical_region_id = RegionId::new(1024 + i, j);
782 logical_region_ids.push(logical_region_id);
783 env.create_logical_region(physical_region_id, logical_region_id)
784 .await;
785 }
786 }
787
788 let metric_engine = env.metric();
789 for region_id in logical_region_ids.iter().chain(physical_region_ids.iter()) {
791 metric_engine
792 .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {}))
793 .await
794 .unwrap();
795 }
796
797 let requests = physical_region_ids
799 .iter()
800 .enumerate()
801 .map(|(idx, region_id)| {
802 let mut options = HashMap::new();
803 let wal_options = WalOptions::Kafka(KafkaWalOptions {
804 topic: topics[topic_idx(idx as u32)].clone(),
805 });
806 options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new());
807 options.insert(
808 WAL_OPTIONS_KEY.to_string(),
809 serde_json::to_string(&wal_options).unwrap(),
810 );
811 (
812 *region_id,
813 RegionOpenRequest {
814 engine: METRIC_ENGINE_NAME.to_string(),
815 table_dir: table_dir(*region_id),
816 path_type: PathType::Bare,
817 options: options.clone(),
818 skip_wal_replay: true,
819 checkpoint: None,
820 },
821 )
822 })
823 .collect::<Vec<_>>();
824 info!("Open batch regions with parallelism: {parallelism}");
825 metric_engine
826 .handle_batch_open_requests(parallelism, requests)
827 .await
828 .unwrap();
829 {
830 let state = metric_engine.inner.state.read().unwrap();
831 for logical_region in &logical_region_ids {
832 assert!(!state.logical_regions().contains_key(logical_region));
833 }
834 }
835
836 let catch_requests = physical_region_ids
837 .iter()
838 .map(|region_id| {
839 (
840 *region_id,
841 RegionCatchupRequest {
842 set_writable: true,
843 ..Default::default()
844 },
845 )
846 })
847 .collect::<Vec<_>>();
848 metric_engine
849 .handle_batch_catchup_requests(parallelism, catch_requests)
850 .await
851 .unwrap();
852 {
853 let state = metric_engine.inner.state.read().unwrap();
854 for logical_region in &logical_region_ids {
855 assert!(state.logical_regions().contains_key(logical_region));
856 }
857 }
858 }
859}