1mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46 RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47 SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61#[derive(Clone)]
123pub struct MetricEngine {
124 inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129 fn name(&self) -> &str {
131 METRIC_ENGINE_NAME
132 }
133
134 async fn handle_batch_open_requests(
135 &self,
136 parallelism: usize,
137 requests: Vec<(RegionId, RegionOpenRequest)>,
138 ) -> Result<BatchResponses, BoxedError> {
139 self.inner
140 .handle_batch_open_requests(parallelism, requests)
141 .await
142 .map_err(BoxedError::new)
143 }
144
145 async fn handle_batch_ddl_requests(
146 &self,
147 batch_request: BatchRegionDdlRequest,
148 ) -> Result<RegionResponse, BoxedError> {
149 match batch_request {
150 BatchRegionDdlRequest::Create(requests) => {
151 let mut extension_return_value = HashMap::new();
152 let rows = self
153 .inner
154 .create_regions(requests, &mut extension_return_value)
155 .await
156 .map_err(BoxedError::new)?;
157
158 Ok(RegionResponse {
159 affected_rows: rows,
160 extensions: extension_return_value,
161 metadata: Vec::new(),
162 })
163 }
164 BatchRegionDdlRequest::Alter(requests) => {
165 let mut extension_return_value = HashMap::new();
166 let rows = self
167 .inner
168 .alter_regions(requests, &mut extension_return_value)
169 .await
170 .map_err(BoxedError::new)?;
171
172 Ok(RegionResponse {
173 affected_rows: rows,
174 extensions: extension_return_value,
175 metadata: Vec::new(),
176 })
177 }
178 BatchRegionDdlRequest::Drop(requests) => {
179 self.handle_requests(
180 requests
181 .into_iter()
182 .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
183 )
184 .await
185 }
186 }
187 }
188
189 async fn handle_request(
191 &self,
192 region_id: RegionId,
193 request: RegionRequest,
194 ) -> Result<RegionResponse, BoxedError> {
195 let mut extension_return_value = HashMap::new();
196
197 let result = match request {
198 RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
199 RegionRequest::Create(create) => {
200 self.inner
201 .create_regions(vec![(region_id, create)], &mut extension_return_value)
202 .await
203 }
204 RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
205 RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
206 RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
207 RegionRequest::Alter(alter) => {
208 self.inner
209 .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
210 .await
211 }
212 RegionRequest::Compact(_) => {
213 if self.inner.is_physical_region(region_id) {
214 self.inner
215 .mito
216 .handle_request(region_id, request)
217 .await
218 .context(error::MitoFlushOperationSnafu)
219 .map(|response| response.affected_rows)
220 } else {
221 UnsupportedRegionRequestSnafu { request }.fail()
222 }
223 }
224 RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
225 RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
226 RegionRequest::Delete(_) => {
227 if self.inner.is_physical_region(region_id) {
228 self.inner
229 .mito
230 .handle_request(region_id, request)
231 .await
232 .context(error::MitoDeleteOperationSnafu)
233 .map(|response| response.affected_rows)
234 } else {
235 UnsupportedRegionRequestSnafu { request }.fail()
236 }
237 }
238 RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
239 RegionRequest::BulkInserts(_) => {
240 UnsupportedRegionRequestSnafu { request }.fail()
242 }
243 };
244
245 result.map_err(BoxedError::new).map(|rows| RegionResponse {
246 affected_rows: rows,
247 extensions: extension_return_value,
248 metadata: Vec::new(),
249 })
250 }
251
252 async fn handle_query(
253 &self,
254 region_id: RegionId,
255 request: ScanRequest,
256 ) -> Result<RegionScannerRef, BoxedError> {
257 self.handle_query(region_id, request).await
258 }
259
260 async fn get_last_seq_num(
261 &self,
262 region_id: RegionId,
263 ) -> Result<Option<SequenceNumber>, BoxedError> {
264 self.inner
265 .get_last_seq_num(region_id)
266 .await
267 .map_err(BoxedError::new)
268 }
269
270 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
272 self.inner
273 .load_region_metadata(region_id)
274 .await
275 .map_err(BoxedError::new)
276 }
277
278 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
282 if self.inner.is_physical_region(region_id) {
283 get_region_statistic(&self.inner.mito, region_id)
284 } else {
285 None
286 }
287 }
288
289 async fn stop(&self) -> Result<(), BoxedError> {
291 Ok(())
293 }
294
295 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
296 for x in [
298 utils::to_metadata_region_id(region_id),
299 utils::to_data_region_id(region_id),
300 ] {
301 if let Err(e) = self.inner.mito.set_region_role(x, role)
302 && e.status_code() != StatusCode::RegionNotFound
303 {
304 return Err(e);
305 }
306 }
307 Ok(())
308 }
309
310 async fn sync_region(
311 &self,
312 region_id: RegionId,
313 manifest_info: RegionManifestInfo,
314 ) -> Result<SyncManifestResponse, BoxedError> {
315 self.inner
316 .sync_region(region_id, manifest_info)
317 .await
318 .map_err(BoxedError::new)
319 }
320
321 async fn set_region_role_state_gracefully(
322 &self,
323 region_id: RegionId,
324 region_role_state: SettableRegionRoleState,
325 ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
326 let metadata_result = match self
327 .inner
328 .mito
329 .set_region_role_state_gracefully(
330 utils::to_metadata_region_id(region_id),
331 region_role_state,
332 )
333 .await?
334 {
335 SetRegionRoleStateResponse::Success(success) => success,
336 SetRegionRoleStateResponse::NotFound => {
337 return Ok(SetRegionRoleStateResponse::NotFound)
338 }
339 };
340
341 let data_result = match self
342 .inner
343 .mito
344 .set_region_role_state_gracefully(region_id, region_role_state)
345 .await?
346 {
347 SetRegionRoleStateResponse::Success(success) => success,
348 SetRegionRoleStateResponse::NotFound => {
349 return Ok(SetRegionRoleStateResponse::NotFound)
350 }
351 };
352
353 Ok(SetRegionRoleStateResponse::success(
354 SetRegionRoleStateSuccess::metric(
355 data_result.last_entry_id().unwrap_or_default(),
356 metadata_result.last_entry_id().unwrap_or_default(),
357 ),
358 ))
359 }
360
361 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
365 if self.inner.is_physical_region(region_id) {
366 self.inner.mito.role(region_id)
367 } else {
368 None
369 }
370 }
371
372 fn as_any(&self) -> &dyn Any {
373 self
374 }
375}
376
377impl MetricEngine {
378 pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
379 let metadata_region = MetadataRegion::new(mito.clone());
380 let data_region = DataRegion::new(mito.clone());
381 let state = Arc::new(RwLock::default());
382 config.sanitize();
383 let flush_interval = config.flush_metadata_region_interval;
384 let inner = Arc::new(MetricEngineInner {
385 mito: mito.clone(),
386 metadata_region,
387 data_region,
388 state: state.clone(),
389 config,
390 row_modifier: RowModifier::default(),
391 flush_task: RepeatedTask::new(
392 flush_interval,
393 Box::new(FlushMetadataRegionTask {
394 state: state.clone(),
395 mito: mito.clone(),
396 }),
397 ),
398 });
399 inner
400 .flush_task
401 .start(common_runtime::global_runtime())
402 .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
403 Ok(Self { inner })
404 }
405
406 pub fn mito(&self) -> MitoEngine {
407 self.inner.mito.clone()
408 }
409
410 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
412 self.inner
413 .metadata_region
414 .logical_regions(physical_region_id)
415 .await
416 }
417
418 async fn handle_query(
420 &self,
421 region_id: RegionId,
422 request: ScanRequest,
423 ) -> Result<RegionScannerRef, BoxedError> {
424 self.inner
425 .read_region(region_id, request)
426 .await
427 .map_err(BoxedError::new)
428 }
429
430 async fn handle_requests(
431 &self,
432 requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
433 ) -> Result<RegionResponse, BoxedError> {
434 let mut affected_rows = 0;
435 let mut extensions = HashMap::new();
436 for (region_id, request) in requests {
437 let response = self.handle_request(region_id, request).await?;
438 affected_rows += response.affected_rows;
439 extensions.extend(response.extensions);
440 }
441
442 Ok(RegionResponse {
443 affected_rows,
444 extensions,
445 metadata: Vec::new(),
446 })
447 }
448}
449
450#[cfg(test)]
451impl MetricEngine {
452 pub async fn scan_to_stream(
453 &self,
454 region_id: RegionId,
455 request: ScanRequest,
456 ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
457 self.inner.scan_to_stream(region_id, request).await
458 }
459
460 pub fn config(&self) -> &EngineConfig {
462 &self.inner.config
463 }
464}
465
466struct MetricEngineInner {
467 mito: MitoEngine,
468 metadata_region: MetadataRegion,
469 data_region: DataRegion,
470 state: Arc<RwLock<MetricEngineState>>,
471 config: EngineConfig,
472 row_modifier: RowModifier,
473 flush_task: RepeatedTask<Error>,
474}
475
476#[cfg(test)]
477mod test {
478 use std::collections::HashMap;
479
480 use common_telemetry::info;
481 use mito2::sst::location::region_dir_from_table_dir;
482 use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
483 use store_api::region_request::{
484 PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest,
485 };
486
487 use super::*;
488 use crate::test_util::TestEnv;
489
490 #[tokio::test]
491 async fn close_open_regions() {
492 let env = TestEnv::new().await;
493 env.init_metric_region().await;
494 let engine = env.metric();
495
496 let physical_region_id = env.default_physical_region_id();
498 engine
499 .handle_request(
500 physical_region_id,
501 RegionRequest::Close(RegionCloseRequest {}),
502 )
503 .await
504 .unwrap();
505
506 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
508 .into_iter()
509 .collect();
510 let open_request = RegionOpenRequest {
511 engine: METRIC_ENGINE_NAME.to_string(),
512 table_dir: TestEnv::default_table_dir(),
513 path_type: PathType::Bare, options: physical_region_option,
515 skip_wal_replay: false,
516 };
517 engine
518 .handle_request(physical_region_id, RegionRequest::Open(open_request))
519 .await
520 .unwrap();
521
522 let nonexistent_region_id = RegionId::new(12313, 12);
524 engine
525 .handle_request(
526 nonexistent_region_id,
527 RegionRequest::Close(RegionCloseRequest {}),
528 )
529 .await
530 .unwrap();
531
532 let invalid_open_request = RegionOpenRequest {
534 engine: METRIC_ENGINE_NAME.to_string(),
535 table_dir: TestEnv::default_table_dir(),
536 path_type: PathType::Bare, options: HashMap::new(),
538 skip_wal_replay: false,
539 };
540 engine
541 .handle_request(
542 nonexistent_region_id,
543 RegionRequest::Open(invalid_open_request),
544 )
545 .await
546 .unwrap();
547 }
548
549 #[tokio::test]
550 async fn test_role() {
551 let env = TestEnv::new().await;
552 env.init_metric_region().await;
553
554 let logical_region_id = env.default_logical_region_id();
555 let physical_region_id = env.default_physical_region_id();
556
557 assert!(env.metric().role(logical_region_id).is_none());
558 assert!(env.metric().role(physical_region_id).is_some());
559 }
560
561 #[tokio::test]
562 async fn test_region_disk_usage() {
563 let env = TestEnv::new().await;
564 env.init_metric_region().await;
565
566 let logical_region_id = env.default_logical_region_id();
567 let physical_region_id = env.default_physical_region_id();
568
569 assert!(env.metric().region_statistic(logical_region_id).is_none());
570 assert!(env.metric().region_statistic(physical_region_id).is_some());
571 }
572
573 #[tokio::test]
574 async fn test_open_region_failure() {
575 let env = TestEnv::new().await;
576 env.init_metric_region().await;
577 let physical_region_id = env.default_physical_region_id();
578
579 let metric_engine = env.metric();
580 metric_engine
581 .handle_request(
582 physical_region_id,
583 RegionRequest::Flush(RegionFlushRequest {
584 row_group_size: None,
585 }),
586 )
587 .await
588 .unwrap();
589
590 let path = region_dir_from_table_dir(
591 &TestEnv::default_table_dir(),
592 physical_region_id,
593 PathType::Metadata,
594 );
595 let object_store = env.get_object_store().unwrap();
596 let list = object_store.list(&path).await.unwrap();
597 for entry in list {
599 if entry.metadata().is_dir() {
600 continue;
601 }
602 if entry.name().ends_with("parquet") {
603 info!("deleting {}", entry.path());
604 object_store.delete(entry.path()).await.unwrap();
605 }
606 }
607
608 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
609 .into_iter()
610 .collect();
611 let open_request = RegionOpenRequest {
612 engine: METRIC_ENGINE_NAME.to_string(),
613 table_dir: TestEnv::default_table_dir(),
614 path_type: PathType::Bare,
615 options: physical_region_option,
616 skip_wal_replay: false,
617 };
618 metric_engine
621 .handle_request(physical_region_id, RegionRequest::Open(open_request))
622 .await
623 .unwrap();
624
625 metric_engine
627 .handle_request(
628 physical_region_id,
629 RegionRequest::Close(RegionCloseRequest {}),
630 )
631 .await
632 .unwrap();
633
634 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
636 .into_iter()
637 .collect();
638 let open_request = RegionOpenRequest {
639 engine: METRIC_ENGINE_NAME.to_string(),
640 table_dir: TestEnv::default_table_dir(),
641 path_type: PathType::Bare,
642 options: physical_region_option,
643 skip_wal_replay: false,
644 };
645 let err = metric_engine
646 .handle_request(physical_region_id, RegionRequest::Open(open_request))
647 .await
648 .unwrap_err();
649 assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
651
652 let mito_engine = metric_engine.mito();
653 let data_region_id = utils::to_data_region_id(physical_region_id);
654 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
655 let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
657 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
658 let err = mito_engine
659 .get_metadata(metadata_region_id)
660 .await
661 .unwrap_err();
662 assert_eq!(err.status_code(), StatusCode::RegionNotFound);
663 }
664}