1mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46 RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47 SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61#[derive(Clone)]
123pub struct MetricEngine {
124 inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129 fn name(&self) -> &str {
131 METRIC_ENGINE_NAME
132 }
133
134 async fn handle_batch_open_requests(
135 &self,
136 parallelism: usize,
137 requests: Vec<(RegionId, RegionOpenRequest)>,
138 ) -> Result<BatchResponses, BoxedError> {
139 self.inner
140 .handle_batch_open_requests(parallelism, requests)
141 .await
142 .map_err(BoxedError::new)
143 }
144
145 async fn handle_batch_ddl_requests(
146 &self,
147 batch_request: BatchRegionDdlRequest,
148 ) -> Result<RegionResponse, BoxedError> {
149 match batch_request {
150 BatchRegionDdlRequest::Create(requests) => {
151 let mut extension_return_value = HashMap::new();
152 let rows = self
153 .inner
154 .create_regions(requests, &mut extension_return_value)
155 .await
156 .map_err(BoxedError::new)?;
157
158 Ok(RegionResponse {
159 affected_rows: rows,
160 extensions: extension_return_value,
161 metadata: Vec::new(),
162 })
163 }
164 BatchRegionDdlRequest::Alter(requests) => {
165 let mut extension_return_value = HashMap::new();
166 let rows = self
167 .inner
168 .alter_regions(requests, &mut extension_return_value)
169 .await
170 .map_err(BoxedError::new)?;
171
172 Ok(RegionResponse {
173 affected_rows: rows,
174 extensions: extension_return_value,
175 metadata: Vec::new(),
176 })
177 }
178 BatchRegionDdlRequest::Drop(requests) => {
179 self.handle_requests(
180 requests
181 .into_iter()
182 .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
183 )
184 .await
185 }
186 }
187 }
188
189 async fn handle_request(
191 &self,
192 region_id: RegionId,
193 request: RegionRequest,
194 ) -> Result<RegionResponse, BoxedError> {
195 let mut extension_return_value = HashMap::new();
196
197 let result = match request {
198 RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
199 RegionRequest::Create(create) => {
200 self.inner
201 .create_regions(vec![(region_id, create)], &mut extension_return_value)
202 .await
203 }
204 RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
205 RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
206 RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
207 RegionRequest::Alter(alter) => {
208 self.inner
209 .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
210 .await
211 }
212 RegionRequest::Compact(_) => {
213 if self.inner.is_physical_region(region_id) {
214 self.inner
215 .mito
216 .handle_request(region_id, request)
217 .await
218 .context(error::MitoFlushOperationSnafu)
219 .map(|response| response.affected_rows)
220 } else {
221 UnsupportedRegionRequestSnafu { request }.fail()
222 }
223 }
224 RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
225 RegionRequest::BuildIndex(_) => {
226 if self.inner.is_physical_region(region_id) {
227 self.inner
228 .mito
229 .handle_request(region_id, request)
230 .await
231 .context(error::MitoFlushOperationSnafu)
232 .map(|response| response.affected_rows)
233 } else {
234 UnsupportedRegionRequestSnafu { request }.fail()
235 }
236 }
237 RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
238 RegionRequest::Delete(_) => {
239 if self.inner.is_physical_region(region_id) {
240 self.inner
241 .mito
242 .handle_request(region_id, request)
243 .await
244 .context(error::MitoDeleteOperationSnafu)
245 .map(|response| response.affected_rows)
246 } else {
247 UnsupportedRegionRequestSnafu { request }.fail()
248 }
249 }
250 RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
251 RegionRequest::BulkInserts(_) => {
252 UnsupportedRegionRequestSnafu { request }.fail()
254 }
255 };
256
257 result.map_err(BoxedError::new).map(|rows| RegionResponse {
258 affected_rows: rows,
259 extensions: extension_return_value,
260 metadata: Vec::new(),
261 })
262 }
263
264 async fn handle_query(
265 &self,
266 region_id: RegionId,
267 request: ScanRequest,
268 ) -> Result<RegionScannerRef, BoxedError> {
269 self.handle_query(region_id, request).await
270 }
271
272 async fn get_committed_sequence(
273 &self,
274 region_id: RegionId,
275 ) -> Result<SequenceNumber, BoxedError> {
276 self.inner
277 .get_last_seq_num(region_id)
278 .await
279 .map_err(BoxedError::new)
280 }
281
282 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
284 self.inner
285 .load_region_metadata(region_id)
286 .await
287 .map_err(BoxedError::new)
288 }
289
290 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
294 if self.inner.is_physical_region(region_id) {
295 get_region_statistic(&self.inner.mito, region_id)
296 } else {
297 None
298 }
299 }
300
301 async fn stop(&self) -> Result<(), BoxedError> {
303 Ok(())
305 }
306
307 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
308 for x in [
310 utils::to_metadata_region_id(region_id),
311 utils::to_data_region_id(region_id),
312 ] {
313 if let Err(e) = self.inner.mito.set_region_role(x, role)
314 && e.status_code() != StatusCode::RegionNotFound
315 {
316 return Err(e);
317 }
318 }
319 Ok(())
320 }
321
322 async fn sync_region(
323 &self,
324 region_id: RegionId,
325 manifest_info: RegionManifestInfo,
326 ) -> Result<SyncManifestResponse, BoxedError> {
327 self.inner
328 .sync_region(region_id, manifest_info)
329 .await
330 .map_err(BoxedError::new)
331 }
332
333 async fn set_region_role_state_gracefully(
334 &self,
335 region_id: RegionId,
336 region_role_state: SettableRegionRoleState,
337 ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
338 let metadata_result = match self
339 .inner
340 .mito
341 .set_region_role_state_gracefully(
342 utils::to_metadata_region_id(region_id),
343 region_role_state,
344 )
345 .await?
346 {
347 SetRegionRoleStateResponse::Success(success) => success,
348 SetRegionRoleStateResponse::NotFound => {
349 return Ok(SetRegionRoleStateResponse::NotFound);
350 }
351 SetRegionRoleStateResponse::InvalidTransition(error) => {
352 return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
353 }
354 };
355
356 let data_result = match self
357 .inner
358 .mito
359 .set_region_role_state_gracefully(region_id, region_role_state)
360 .await?
361 {
362 SetRegionRoleStateResponse::Success(success) => success,
363 SetRegionRoleStateResponse::NotFound => {
364 return Ok(SetRegionRoleStateResponse::NotFound);
365 }
366 SetRegionRoleStateResponse::InvalidTransition(error) => {
367 return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
368 }
369 };
370
371 Ok(SetRegionRoleStateResponse::success(
372 SetRegionRoleStateSuccess::metric(
373 data_result.last_entry_id().unwrap_or_default(),
374 metadata_result.last_entry_id().unwrap_or_default(),
375 ),
376 ))
377 }
378
379 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
383 if self.inner.is_physical_region(region_id) {
384 self.inner.mito.role(region_id)
385 } else {
386 None
387 }
388 }
389
390 fn as_any(&self) -> &dyn Any {
391 self
392 }
393}
394
395impl MetricEngine {
396 pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
397 let metadata_region = MetadataRegion::new(mito.clone());
398 let data_region = DataRegion::new(mito.clone());
399 let state = Arc::new(RwLock::default());
400 config.sanitize();
401 let flush_interval = config.flush_metadata_region_interval;
402 let inner = Arc::new(MetricEngineInner {
403 mito: mito.clone(),
404 metadata_region,
405 data_region,
406 state: state.clone(),
407 config,
408 row_modifier: RowModifier::default(),
409 flush_task: RepeatedTask::new(
410 flush_interval,
411 Box::new(FlushMetadataRegionTask {
412 state: state.clone(),
413 mito: mito.clone(),
414 }),
415 ),
416 });
417 inner
418 .flush_task
419 .start(common_runtime::global_runtime())
420 .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
421 Ok(Self { inner })
422 }
423
424 pub fn mito(&self) -> MitoEngine {
425 self.inner.mito.clone()
426 }
427
428 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
430 self.inner
431 .metadata_region
432 .logical_regions(physical_region_id)
433 .await
434 }
435
436 async fn handle_query(
438 &self,
439 region_id: RegionId,
440 request: ScanRequest,
441 ) -> Result<RegionScannerRef, BoxedError> {
442 self.inner
443 .read_region(region_id, request)
444 .await
445 .map_err(BoxedError::new)
446 }
447
448 async fn handle_requests(
449 &self,
450 requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
451 ) -> Result<RegionResponse, BoxedError> {
452 let mut affected_rows = 0;
453 let mut extensions = HashMap::new();
454 for (region_id, request) in requests {
455 let response = self.handle_request(region_id, request).await?;
456 affected_rows += response.affected_rows;
457 extensions.extend(response.extensions);
458 }
459
460 Ok(RegionResponse {
461 affected_rows,
462 extensions,
463 metadata: Vec::new(),
464 })
465 }
466}
467
468#[cfg(test)]
469impl MetricEngine {
470 pub async fn scan_to_stream(
471 &self,
472 region_id: RegionId,
473 request: ScanRequest,
474 ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
475 self.inner.scan_to_stream(region_id, request).await
476 }
477
478 pub fn config(&self) -> &EngineConfig {
480 &self.inner.config
481 }
482}
483
484struct MetricEngineInner {
485 mito: MitoEngine,
486 metadata_region: MetadataRegion,
487 data_region: DataRegion,
488 state: Arc<RwLock<MetricEngineState>>,
489 config: EngineConfig,
490 row_modifier: RowModifier,
491 flush_task: RepeatedTask<Error>,
492}
493
494#[cfg(test)]
495mod test {
496 use std::collections::HashMap;
497
498 use common_telemetry::info;
499 use mito2::sst::location::region_dir_from_table_dir;
500 use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
501 use store_api::region_request::{
502 PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
503 };
504
505 use super::*;
506 use crate::test_util::TestEnv;
507
508 #[tokio::test]
509 async fn close_open_regions() {
510 let env = TestEnv::new().await;
511 env.init_metric_region().await;
512 let engine = env.metric();
513
514 let physical_region_id = env.default_physical_region_id();
516 engine
517 .handle_request(
518 physical_region_id,
519 RegionRequest::Close(RegionCloseRequest {}),
520 )
521 .await
522 .unwrap();
523
524 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
526 .into_iter()
527 .collect();
528 let open_request = RegionOpenRequest {
529 engine: METRIC_ENGINE_NAME.to_string(),
530 table_dir: TestEnv::default_table_dir(),
531 path_type: PathType::Bare, options: physical_region_option,
533 skip_wal_replay: false,
534 checkpoint: None,
535 };
536 engine
537 .handle_request(physical_region_id, RegionRequest::Open(open_request))
538 .await
539 .unwrap();
540
541 let nonexistent_region_id = RegionId::new(12313, 12);
543 engine
544 .handle_request(
545 nonexistent_region_id,
546 RegionRequest::Close(RegionCloseRequest {}),
547 )
548 .await
549 .unwrap();
550
551 let invalid_open_request = RegionOpenRequest {
553 engine: METRIC_ENGINE_NAME.to_string(),
554 table_dir: TestEnv::default_table_dir(),
555 path_type: PathType::Bare, options: HashMap::new(),
557 skip_wal_replay: false,
558 checkpoint: None,
559 };
560 engine
561 .handle_request(
562 nonexistent_region_id,
563 RegionRequest::Open(invalid_open_request),
564 )
565 .await
566 .unwrap();
567 }
568
569 #[tokio::test]
570 async fn test_role() {
571 let env = TestEnv::new().await;
572 env.init_metric_region().await;
573
574 let logical_region_id = env.default_logical_region_id();
575 let physical_region_id = env.default_physical_region_id();
576
577 assert!(env.metric().role(logical_region_id).is_none());
578 assert!(env.metric().role(physical_region_id).is_some());
579 }
580
581 #[tokio::test]
582 async fn test_region_disk_usage() {
583 let env = TestEnv::new().await;
584 env.init_metric_region().await;
585
586 let logical_region_id = env.default_logical_region_id();
587 let physical_region_id = env.default_physical_region_id();
588
589 assert!(env.metric().region_statistic(logical_region_id).is_none());
590 assert!(env.metric().region_statistic(physical_region_id).is_some());
591 }
592
593 #[tokio::test]
594 async fn test_open_region_failure() {
595 let env = TestEnv::new().await;
596 env.init_metric_region().await;
597 let physical_region_id = env.default_physical_region_id();
598
599 let metric_engine = env.metric();
600 metric_engine
601 .handle_request(
602 physical_region_id,
603 RegionRequest::Flush(RegionFlushRequest {
604 row_group_size: None,
605 }),
606 )
607 .await
608 .unwrap();
609
610 let path = region_dir_from_table_dir(
611 &TestEnv::default_table_dir(),
612 physical_region_id,
613 PathType::Metadata,
614 );
615 let object_store = env.get_object_store().unwrap();
616 let list = object_store.list(&path).await.unwrap();
617 for entry in list {
619 if entry.metadata().is_dir() {
620 continue;
621 }
622 if entry.name().ends_with("parquet") {
623 info!("deleting {}", entry.path());
624 object_store.delete(entry.path()).await.unwrap();
625 }
626 }
627
628 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
629 .into_iter()
630 .collect();
631 let open_request = RegionOpenRequest {
632 engine: METRIC_ENGINE_NAME.to_string(),
633 table_dir: TestEnv::default_table_dir(),
634 path_type: PathType::Bare,
635 options: physical_region_option,
636 skip_wal_replay: false,
637 checkpoint: None,
638 };
639 metric_engine
642 .handle_request(physical_region_id, RegionRequest::Open(open_request))
643 .await
644 .unwrap();
645
646 metric_engine
648 .handle_request(
649 physical_region_id,
650 RegionRequest::Close(RegionCloseRequest {}),
651 )
652 .await
653 .unwrap();
654
655 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
657 .into_iter()
658 .collect();
659 let open_request = RegionOpenRequest {
660 engine: METRIC_ENGINE_NAME.to_string(),
661 table_dir: TestEnv::default_table_dir(),
662 path_type: PathType::Bare,
663 options: physical_region_option,
664 skip_wal_replay: false,
665 checkpoint: None,
666 };
667 let err = metric_engine
668 .handle_request(physical_region_id, RegionRequest::Open(open_request))
669 .await
670 .unwrap_err();
671 assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
673
674 let mito_engine = metric_engine.mito();
675 let data_region_id = utils::to_data_region_id(physical_region_id);
676 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
677 let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
679 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
680 let err = mito_engine
681 .get_metadata(metadata_region_id)
682 .await
683 .unwrap_err();
684 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
685 }
686}