1mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46 RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47 SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61#[derive(Clone)]
123pub struct MetricEngine {
124 inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129 fn name(&self) -> &str {
131 METRIC_ENGINE_NAME
132 }
133
134 async fn handle_batch_open_requests(
135 &self,
136 parallelism: usize,
137 requests: Vec<(RegionId, RegionOpenRequest)>,
138 ) -> Result<BatchResponses, BoxedError> {
139 self.inner
140 .handle_batch_open_requests(parallelism, requests)
141 .await
142 .map_err(BoxedError::new)
143 }
144
145 async fn handle_batch_ddl_requests(
146 &self,
147 batch_request: BatchRegionDdlRequest,
148 ) -> Result<RegionResponse, BoxedError> {
149 match batch_request {
150 BatchRegionDdlRequest::Create(requests) => {
151 let mut extension_return_value = HashMap::new();
152 let rows = self
153 .inner
154 .create_regions(requests, &mut extension_return_value)
155 .await
156 .map_err(BoxedError::new)?;
157
158 Ok(RegionResponse {
159 affected_rows: rows,
160 extensions: extension_return_value,
161 })
162 }
163 BatchRegionDdlRequest::Alter(requests) => {
164 let mut extension_return_value = HashMap::new();
165 let rows = self
166 .inner
167 .alter_regions(requests, &mut extension_return_value)
168 .await
169 .map_err(BoxedError::new)?;
170
171 Ok(RegionResponse {
172 affected_rows: rows,
173 extensions: extension_return_value,
174 })
175 }
176 BatchRegionDdlRequest::Drop(requests) => {
177 self.handle_requests(
178 requests
179 .into_iter()
180 .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
181 )
182 .await
183 }
184 }
185 }
186
187 async fn handle_request(
189 &self,
190 region_id: RegionId,
191 request: RegionRequest,
192 ) -> Result<RegionResponse, BoxedError> {
193 let mut extension_return_value = HashMap::new();
194
195 let result = match request {
196 RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
197 RegionRequest::Create(create) => {
198 self.inner
199 .create_regions(vec![(region_id, create)], &mut extension_return_value)
200 .await
201 }
202 RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
203 RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
204 RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
205 RegionRequest::Alter(alter) => {
206 self.inner
207 .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
208 .await
209 }
210 RegionRequest::Compact(_) => {
211 if self.inner.is_physical_region(region_id) {
212 self.inner
213 .mito
214 .handle_request(region_id, request)
215 .await
216 .context(error::MitoFlushOperationSnafu)
217 .map(|response| response.affected_rows)
218 } else {
219 UnsupportedRegionRequestSnafu { request }.fail()
220 }
221 }
222 RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
223 RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
224 RegionRequest::Delete(_) => {
225 if self.inner.is_physical_region(region_id) {
226 self.inner
227 .mito
228 .handle_request(region_id, request)
229 .await
230 .context(error::MitoDeleteOperationSnafu)
231 .map(|response| response.affected_rows)
232 } else {
233 UnsupportedRegionRequestSnafu { request }.fail()
234 }
235 }
236 RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
237 RegionRequest::BulkInserts(_) => {
238 UnsupportedRegionRequestSnafu { request }.fail()
240 }
241 };
242
243 result.map_err(BoxedError::new).map(|rows| RegionResponse {
244 affected_rows: rows,
245 extensions: extension_return_value,
246 })
247 }
248
249 async fn handle_query(
250 &self,
251 region_id: RegionId,
252 request: ScanRequest,
253 ) -> Result<RegionScannerRef, BoxedError> {
254 self.handle_query(region_id, request).await
255 }
256
257 async fn get_last_seq_num(
258 &self,
259 region_id: RegionId,
260 ) -> Result<Option<SequenceNumber>, BoxedError> {
261 self.inner
262 .get_last_seq_num(region_id)
263 .await
264 .map_err(BoxedError::new)
265 }
266
267 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
269 self.inner
270 .load_region_metadata(region_id)
271 .await
272 .map_err(BoxedError::new)
273 }
274
275 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
279 if self.inner.is_physical_region(region_id) {
280 get_region_statistic(&self.inner.mito, region_id)
281 } else {
282 None
283 }
284 }
285
286 async fn stop(&self) -> Result<(), BoxedError> {
288 Ok(())
290 }
291
292 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
293 for x in [
295 utils::to_metadata_region_id(region_id),
296 utils::to_data_region_id(region_id),
297 ] {
298 if let Err(e) = self.inner.mito.set_region_role(x, role)
299 && e.status_code() != StatusCode::RegionNotFound
300 {
301 return Err(e);
302 }
303 }
304 Ok(())
305 }
306
307 async fn sync_region(
308 &self,
309 region_id: RegionId,
310 manifest_info: RegionManifestInfo,
311 ) -> Result<SyncManifestResponse, BoxedError> {
312 self.inner
313 .sync_region(region_id, manifest_info)
314 .await
315 .map_err(BoxedError::new)
316 }
317
318 async fn set_region_role_state_gracefully(
319 &self,
320 region_id: RegionId,
321 region_role_state: SettableRegionRoleState,
322 ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
323 let metadata_result = match self
324 .inner
325 .mito
326 .set_region_role_state_gracefully(
327 utils::to_metadata_region_id(region_id),
328 region_role_state,
329 )
330 .await?
331 {
332 SetRegionRoleStateResponse::Success(success) => success,
333 SetRegionRoleStateResponse::NotFound => {
334 return Ok(SetRegionRoleStateResponse::NotFound)
335 }
336 };
337
338 let data_result = match self
339 .inner
340 .mito
341 .set_region_role_state_gracefully(region_id, region_role_state)
342 .await?
343 {
344 SetRegionRoleStateResponse::Success(success) => success,
345 SetRegionRoleStateResponse::NotFound => {
346 return Ok(SetRegionRoleStateResponse::NotFound)
347 }
348 };
349
350 Ok(SetRegionRoleStateResponse::success(
351 SetRegionRoleStateSuccess::metric(
352 data_result.last_entry_id().unwrap_or_default(),
353 metadata_result.last_entry_id().unwrap_or_default(),
354 ),
355 ))
356 }
357
358 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
362 if self.inner.is_physical_region(region_id) {
363 self.inner.mito.role(region_id)
364 } else {
365 None
366 }
367 }
368
369 fn as_any(&self) -> &dyn Any {
370 self
371 }
372}
373
374impl MetricEngine {
375 pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
376 let metadata_region = MetadataRegion::new(mito.clone());
377 let data_region = DataRegion::new(mito.clone());
378 let state = Arc::new(RwLock::default());
379 config.sanitize();
380 let flush_interval = config.flush_metadata_region_interval;
381 let inner = Arc::new(MetricEngineInner {
382 mito: mito.clone(),
383 metadata_region,
384 data_region,
385 state: state.clone(),
386 config,
387 row_modifier: RowModifier::new(),
388 flush_task: RepeatedTask::new(
389 flush_interval,
390 Box::new(FlushMetadataRegionTask {
391 state: state.clone(),
392 mito: mito.clone(),
393 }),
394 ),
395 });
396 inner
397 .flush_task
398 .start(common_runtime::global_runtime())
399 .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
400 Ok(Self { inner })
401 }
402
403 pub fn mito(&self) -> MitoEngine {
404 self.inner.mito.clone()
405 }
406
407 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
409 self.inner
410 .metadata_region
411 .logical_regions(physical_region_id)
412 .await
413 }
414
415 async fn handle_query(
417 &self,
418 region_id: RegionId,
419 request: ScanRequest,
420 ) -> Result<RegionScannerRef, BoxedError> {
421 self.inner
422 .read_region(region_id, request)
423 .await
424 .map_err(BoxedError::new)
425 }
426
427 async fn handle_requests(
428 &self,
429 requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
430 ) -> Result<RegionResponse, BoxedError> {
431 let mut affected_rows = 0;
432 let mut extensions = HashMap::new();
433 for (region_id, request) in requests {
434 let response = self.handle_request(region_id, request).await?;
435 affected_rows += response.affected_rows;
436 extensions.extend(response.extensions);
437 }
438
439 Ok(RegionResponse {
440 affected_rows,
441 extensions,
442 })
443 }
444}
445
446#[cfg(test)]
447impl MetricEngine {
448 pub async fn scan_to_stream(
449 &self,
450 region_id: RegionId,
451 request: ScanRequest,
452 ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
453 self.inner.scan_to_stream(region_id, request).await
454 }
455
456 pub fn config(&self) -> &EngineConfig {
458 &self.inner.config
459 }
460}
461
462struct MetricEngineInner {
463 mito: MitoEngine,
464 metadata_region: MetadataRegion,
465 data_region: DataRegion,
466 state: Arc<RwLock<MetricEngineState>>,
467 config: EngineConfig,
468 row_modifier: RowModifier,
469 flush_task: RepeatedTask<Error>,
470}
471
472#[cfg(test)]
473mod test {
474 use std::collections::HashMap;
475
476 use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
477 use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
478
479 use super::*;
480 use crate::test_util::TestEnv;
481
482 #[tokio::test]
483 async fn close_open_regions() {
484 let env = TestEnv::new().await;
485 env.init_metric_region().await;
486 let engine = env.metric();
487
488 let physical_region_id = env.default_physical_region_id();
490 engine
491 .handle_request(
492 physical_region_id,
493 RegionRequest::Close(RegionCloseRequest {}),
494 )
495 .await
496 .unwrap();
497
498 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
500 .into_iter()
501 .collect();
502 let open_request = RegionOpenRequest {
503 engine: METRIC_ENGINE_NAME.to_string(),
504 region_dir: env.default_region_dir(),
505 options: physical_region_option,
506 skip_wal_replay: false,
507 };
508 engine
509 .handle_request(physical_region_id, RegionRequest::Open(open_request))
510 .await
511 .unwrap();
512
513 let nonexistent_region_id = RegionId::new(12313, 12);
515 engine
516 .handle_request(
517 nonexistent_region_id,
518 RegionRequest::Close(RegionCloseRequest {}),
519 )
520 .await
521 .unwrap();
522
523 let invalid_open_request = RegionOpenRequest {
525 engine: METRIC_ENGINE_NAME.to_string(),
526 region_dir: env.default_region_dir(),
527 options: HashMap::new(),
528 skip_wal_replay: false,
529 };
530 engine
531 .handle_request(
532 nonexistent_region_id,
533 RegionRequest::Open(invalid_open_request),
534 )
535 .await
536 .unwrap();
537 }
538
539 #[tokio::test]
540 async fn test_role() {
541 let env = TestEnv::new().await;
542 env.init_metric_region().await;
543
544 let logical_region_id = env.default_logical_region_id();
545 let physical_region_id = env.default_physical_region_id();
546
547 assert!(env.metric().role(logical_region_id).is_none());
548 assert!(env.metric().role(physical_region_id).is_some());
549 }
550
551 #[tokio::test]
552 async fn test_region_disk_usage() {
553 let env = TestEnv::new().await;
554 env.init_metric_region().await;
555
556 let logical_region_id = env.default_logical_region_id();
557 let physical_region_id = env.default_physical_region_id();
558
559 assert!(env.metric().region_statistic(logical_region_id).is_none());
560 assert!(env.metric().region_statistic(physical_region_id).is_some());
561 }
562}