1mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45 BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46 RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47 SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61#[derive(Clone)]
123pub struct MetricEngine {
124 inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129 fn name(&self) -> &str {
131 METRIC_ENGINE_NAME
132 }
133
134 async fn handle_batch_open_requests(
135 &self,
136 parallelism: usize,
137 requests: Vec<(RegionId, RegionOpenRequest)>,
138 ) -> Result<BatchResponses, BoxedError> {
139 self.inner
140 .handle_batch_open_requests(parallelism, requests)
141 .await
142 .map_err(BoxedError::new)
143 }
144
145 async fn handle_batch_ddl_requests(
146 &self,
147 batch_request: BatchRegionDdlRequest,
148 ) -> Result<RegionResponse, BoxedError> {
149 match batch_request {
150 BatchRegionDdlRequest::Create(requests) => {
151 let mut extension_return_value = HashMap::new();
152 let rows = self
153 .inner
154 .create_regions(requests, &mut extension_return_value)
155 .await
156 .map_err(BoxedError::new)?;
157
158 Ok(RegionResponse {
159 affected_rows: rows,
160 extensions: extension_return_value,
161 metadata: Vec::new(),
162 })
163 }
164 BatchRegionDdlRequest::Alter(requests) => {
165 let mut extension_return_value = HashMap::new();
166 let rows = self
167 .inner
168 .alter_regions(requests, &mut extension_return_value)
169 .await
170 .map_err(BoxedError::new)?;
171
172 Ok(RegionResponse {
173 affected_rows: rows,
174 extensions: extension_return_value,
175 metadata: Vec::new(),
176 })
177 }
178 BatchRegionDdlRequest::Drop(requests) => {
179 self.handle_requests(
180 requests
181 .into_iter()
182 .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
183 )
184 .await
185 }
186 }
187 }
188
189 async fn handle_request(
191 &self,
192 region_id: RegionId,
193 request: RegionRequest,
194 ) -> Result<RegionResponse, BoxedError> {
195 let mut extension_return_value = HashMap::new();
196
197 let result = match request {
198 RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
199 RegionRequest::Create(create) => {
200 self.inner
201 .create_regions(vec![(region_id, create)], &mut extension_return_value)
202 .await
203 }
204 RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
205 RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
206 RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
207 RegionRequest::Alter(alter) => {
208 self.inner
209 .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
210 .await
211 }
212 RegionRequest::Compact(_) => {
213 if self.inner.is_physical_region(region_id) {
214 self.inner
215 .mito
216 .handle_request(region_id, request)
217 .await
218 .context(error::MitoFlushOperationSnafu)
219 .map(|response| response.affected_rows)
220 } else {
221 UnsupportedRegionRequestSnafu { request }.fail()
222 }
223 }
224 RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
225 RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
226 RegionRequest::Delete(_) => {
227 if self.inner.is_physical_region(region_id) {
228 self.inner
229 .mito
230 .handle_request(region_id, request)
231 .await
232 .context(error::MitoDeleteOperationSnafu)
233 .map(|response| response.affected_rows)
234 } else {
235 UnsupportedRegionRequestSnafu { request }.fail()
236 }
237 }
238 RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
239 RegionRequest::BulkInserts(_) => {
240 UnsupportedRegionRequestSnafu { request }.fail()
242 }
243 };
244
245 result.map_err(BoxedError::new).map(|rows| RegionResponse {
246 affected_rows: rows,
247 extensions: extension_return_value,
248 metadata: Vec::new(),
249 })
250 }
251
252 async fn handle_query(
253 &self,
254 region_id: RegionId,
255 request: ScanRequest,
256 ) -> Result<RegionScannerRef, BoxedError> {
257 self.handle_query(region_id, request).await
258 }
259
260 async fn get_last_seq_num(
261 &self,
262 region_id: RegionId,
263 ) -> Result<Option<SequenceNumber>, BoxedError> {
264 self.inner
265 .get_last_seq_num(region_id)
266 .await
267 .map_err(BoxedError::new)
268 }
269
270 async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
272 self.inner
273 .load_region_metadata(region_id)
274 .await
275 .map_err(BoxedError::new)
276 }
277
278 fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
282 if self.inner.is_physical_region(region_id) {
283 get_region_statistic(&self.inner.mito, region_id)
284 } else {
285 None
286 }
287 }
288
289 async fn stop(&self) -> Result<(), BoxedError> {
291 Ok(())
293 }
294
295 fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
296 for x in [
298 utils::to_metadata_region_id(region_id),
299 utils::to_data_region_id(region_id),
300 ] {
301 if let Err(e) = self.inner.mito.set_region_role(x, role)
302 && e.status_code() != StatusCode::RegionNotFound
303 {
304 return Err(e);
305 }
306 }
307 Ok(())
308 }
309
310 async fn sync_region(
311 &self,
312 region_id: RegionId,
313 manifest_info: RegionManifestInfo,
314 ) -> Result<SyncManifestResponse, BoxedError> {
315 self.inner
316 .sync_region(region_id, manifest_info)
317 .await
318 .map_err(BoxedError::new)
319 }
320
321 async fn set_region_role_state_gracefully(
322 &self,
323 region_id: RegionId,
324 region_role_state: SettableRegionRoleState,
325 ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
326 let metadata_result = match self
327 .inner
328 .mito
329 .set_region_role_state_gracefully(
330 utils::to_metadata_region_id(region_id),
331 region_role_state,
332 )
333 .await?
334 {
335 SetRegionRoleStateResponse::Success(success) => success,
336 SetRegionRoleStateResponse::NotFound => {
337 return Ok(SetRegionRoleStateResponse::NotFound)
338 }
339 };
340
341 let data_result = match self
342 .inner
343 .mito
344 .set_region_role_state_gracefully(region_id, region_role_state)
345 .await?
346 {
347 SetRegionRoleStateResponse::Success(success) => success,
348 SetRegionRoleStateResponse::NotFound => {
349 return Ok(SetRegionRoleStateResponse::NotFound)
350 }
351 };
352
353 Ok(SetRegionRoleStateResponse::success(
354 SetRegionRoleStateSuccess::metric(
355 data_result.last_entry_id().unwrap_or_default(),
356 metadata_result.last_entry_id().unwrap_or_default(),
357 ),
358 ))
359 }
360
361 fn role(&self, region_id: RegionId) -> Option<RegionRole> {
365 if self.inner.is_physical_region(region_id) {
366 self.inner.mito.role(region_id)
367 } else {
368 None
369 }
370 }
371
372 fn as_any(&self) -> &dyn Any {
373 self
374 }
375}
376
377impl MetricEngine {
378 pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
379 let metadata_region = MetadataRegion::new(mito.clone());
380 let data_region = DataRegion::new(mito.clone());
381 let state = Arc::new(RwLock::default());
382 config.sanitize();
383 let flush_interval = config.flush_metadata_region_interval;
384 let inner = Arc::new(MetricEngineInner {
385 mito: mito.clone(),
386 metadata_region,
387 data_region,
388 state: state.clone(),
389 config,
390 row_modifier: RowModifier::new(),
391 flush_task: RepeatedTask::new(
392 flush_interval,
393 Box::new(FlushMetadataRegionTask {
394 state: state.clone(),
395 mito: mito.clone(),
396 }),
397 ),
398 });
399 inner
400 .flush_task
401 .start(common_runtime::global_runtime())
402 .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
403 Ok(Self { inner })
404 }
405
406 pub fn mito(&self) -> MitoEngine {
407 self.inner.mito.clone()
408 }
409
410 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
412 self.inner
413 .metadata_region
414 .logical_regions(physical_region_id)
415 .await
416 }
417
418 async fn handle_query(
420 &self,
421 region_id: RegionId,
422 request: ScanRequest,
423 ) -> Result<RegionScannerRef, BoxedError> {
424 self.inner
425 .read_region(region_id, request)
426 .await
427 .map_err(BoxedError::new)
428 }
429
430 async fn handle_requests(
431 &self,
432 requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
433 ) -> Result<RegionResponse, BoxedError> {
434 let mut affected_rows = 0;
435 let mut extensions = HashMap::new();
436 for (region_id, request) in requests {
437 let response = self.handle_request(region_id, request).await?;
438 affected_rows += response.affected_rows;
439 extensions.extend(response.extensions);
440 }
441
442 Ok(RegionResponse {
443 affected_rows,
444 extensions,
445 metadata: Vec::new(),
446 })
447 }
448}
449
450#[cfg(test)]
451impl MetricEngine {
452 pub async fn scan_to_stream(
453 &self,
454 region_id: RegionId,
455 request: ScanRequest,
456 ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
457 self.inner.scan_to_stream(region_id, request).await
458 }
459
460 pub fn config(&self) -> &EngineConfig {
462 &self.inner.config
463 }
464}
465
466struct MetricEngineInner {
467 mito: MitoEngine,
468 metadata_region: MetadataRegion,
469 data_region: DataRegion,
470 state: Arc<RwLock<MetricEngineState>>,
471 config: EngineConfig,
472 row_modifier: RowModifier,
473 flush_task: RepeatedTask<Error>,
474}
475
476#[cfg(test)]
477mod test {
478 use std::collections::HashMap;
479
480 use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
481 use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
482
483 use super::*;
484 use crate::test_util::TestEnv;
485
486 #[tokio::test]
487 async fn close_open_regions() {
488 let env = TestEnv::new().await;
489 env.init_metric_region().await;
490 let engine = env.metric();
491
492 let physical_region_id = env.default_physical_region_id();
494 engine
495 .handle_request(
496 physical_region_id,
497 RegionRequest::Close(RegionCloseRequest {}),
498 )
499 .await
500 .unwrap();
501
502 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
504 .into_iter()
505 .collect();
506 let open_request = RegionOpenRequest {
507 engine: METRIC_ENGINE_NAME.to_string(),
508 region_dir: env.default_region_dir(),
509 options: physical_region_option,
510 skip_wal_replay: false,
511 };
512 engine
513 .handle_request(physical_region_id, RegionRequest::Open(open_request))
514 .await
515 .unwrap();
516
517 let nonexistent_region_id = RegionId::new(12313, 12);
519 engine
520 .handle_request(
521 nonexistent_region_id,
522 RegionRequest::Close(RegionCloseRequest {}),
523 )
524 .await
525 .unwrap();
526
527 let invalid_open_request = RegionOpenRequest {
529 engine: METRIC_ENGINE_NAME.to_string(),
530 region_dir: env.default_region_dir(),
531 options: HashMap::new(),
532 skip_wal_replay: false,
533 };
534 engine
535 .handle_request(
536 nonexistent_region_id,
537 RegionRequest::Open(invalid_open_request),
538 )
539 .await
540 .unwrap();
541 }
542
543 #[tokio::test]
544 async fn test_role() {
545 let env = TestEnv::new().await;
546 env.init_metric_region().await;
547
548 let logical_region_id = env.default_logical_region_id();
549 let physical_region_id = env.default_physical_region_id();
550
551 assert!(env.metric().role(logical_region_id).is_none());
552 assert!(env.metric().role(physical_region_id).is_some());
553 }
554
555 #[tokio::test]
556 async fn test_region_disk_usage() {
557 let env = TestEnv::new().await;
558 env.init_metric_region().await;
559
560 let logical_region_id = env.default_logical_region_id();
561 let physical_region_id = env.default_physical_region_id();
562
563 assert!(env.metric().region_statistic(logical_region_id).is_none());
564 assert!(env.metric().region_statistic(physical_region_id).is_some());
565 }
566}