mod alter;
mod catchup;
mod close;
mod create;
mod drop;
mod flush;
mod open;
mod options;
mod put;
mod read;
mod region_metadata;
mod state;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use mito2::engine::MitoEngine;
pub(crate) use options::IndexOptions;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use self::state::MetricEngineState;
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, MetricManifestInfoSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
#[cfg_attr(doc, aquamarine::aquamarine)]
#[derive(Clone)]
pub struct MetricEngine {
inner: Arc<MetricEngineInner>,
}
#[async_trait]
impl RegionEngine for MetricEngine {
fn name(&self) -> &str {
METRIC_ENGINE_NAME
}
async fn handle_batch_ddl_requests(
&self,
batch_request: BatchRegionDdlRequest,
) -> Result<RegionResponse, BoxedError> {
match batch_request {
BatchRegionDdlRequest::Create(requests) => {
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.create_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionDdlRequest::Alter(requests) => {
let mut extension_return_value = HashMap::new();
let rows = self
.inner
.alter_regions(requests, &mut extension_return_value)
.await
.map_err(BoxedError::new)?;
Ok(RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
BatchRegionDdlRequest::Drop(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
)
.await
}
}
}
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<RegionResponse, BoxedError> {
let mut extension_return_value = HashMap::new();
let result = match request {
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Create(create) => {
self.inner
.create_regions(vec![(region_id, create)], &mut extension_return_value)
.await
}
RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
RegionRequest::Alter(alter) => {
self.inner
.alter_regions(vec![(region_id, alter)], &mut extension_return_value)
.await
}
RegionRequest::Compact(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoFlushOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
RegionRequest::Delete(_) => {
if self.inner.is_physical_region(region_id) {
self.inner
.mito
.handle_request(region_id, request)
.await
.context(error::MitoDeleteOperationSnafu)
.map(|response| response.affected_rows)
} else {
UnsupportedRegionRequestSnafu { request }.fail()
}
}
RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
};
result.map_err(BoxedError::new).map(|rows| RegionResponse {
affected_rows: rows,
extensions: extension_return_value,
})
}
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.handle_query(region_id, request).await
}
async fn get_last_seq_num(
&self,
region_id: RegionId,
) -> Result<Option<SequenceNumber>, BoxedError> {
self.inner
.get_last_seq_num(region_id)
.await
.map_err(BoxedError::new)
}
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
self.inner
.load_region_metadata(region_id)
.await
.map_err(BoxedError::new)
}
fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
if self.inner.is_physical_region(region_id) {
let metadata_region_id = utils::to_metadata_region_id(region_id);
let data_region_id = utils::to_data_region_id(region_id);
let metadata_stat = self.inner.mito.region_statistic(metadata_region_id);
let data_stat = self.inner.mito.region_statistic(data_region_id);
match (metadata_stat, data_stat) {
(Some(metadata_stat), Some(data_stat)) => Some(RegionStatistic {
num_rows: metadata_stat.num_rows + data_stat.num_rows,
memtable_size: metadata_stat.memtable_size + data_stat.memtable_size,
wal_size: metadata_stat.wal_size + data_stat.wal_size,
manifest_size: metadata_stat.manifest_size + data_stat.manifest_size,
sst_size: metadata_stat.sst_size + data_stat.sst_size,
index_size: metadata_stat.index_size + data_stat.index_size,
manifest: RegionManifestInfo::Metric {
data_flushed_entry_id: data_stat.manifest.data_flushed_entry_id(),
data_manifest_version: data_stat.manifest.data_manifest_version(),
metadata_flushed_entry_id: metadata_stat.manifest.data_flushed_entry_id(),
metadata_manifest_version: metadata_stat.manifest.data_manifest_version(),
},
}),
_ => None,
}
} else {
None
}
}
async fn stop(&self) -> Result<(), BoxedError> {
Ok(())
}
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
for x in [
utils::to_metadata_region_id(region_id),
utils::to_data_region_id(region_id),
] {
if let Err(e) = self.inner.mito.set_region_role(x, role)
&& e.status_code() != StatusCode::RegionNotFound
{
return Err(e);
}
}
Ok(())
}
async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
if !manifest_info.is_metric() {
return Err(BoxedError::new(
MetricManifestInfoSnafu { region_id }.build(),
));
}
let metadata_region_id = utils::to_metadata_region_id(region_id);
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
self.inner
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
.await?;
let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
self.inner
.mito
.sync_region(data_region_id, data_region_manifest)
.await?;
Ok(())
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,
region_role_state: SettableRegionRoleState,
) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
self.inner
.mito
.set_region_role_state_gracefully(
utils::to_metadata_region_id(region_id),
region_role_state,
)
.await?;
self.inner
.mito
.set_region_role_state_gracefully(region_id, region_role_state)
.await
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
if self.inner.is_physical_region(region_id) {
self.inner.mito.role(region_id)
} else {
None
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl MetricEngine {
pub fn new(mito: MitoEngine, config: EngineConfig) -> Self {
let metadata_region = MetadataRegion::new(mito.clone());
let data_region = DataRegion::new(mito.clone());
Self {
inner: Arc::new(MetricEngineInner {
mito,
metadata_region,
data_region,
state: RwLock::default(),
config,
row_modifier: RowModifier::new(),
}),
}
}
pub fn mito(&self) -> MitoEngine {
self.inner.mito.clone()
}
pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
self.inner
.metadata_region
.logical_regions(physical_region_id)
.await
}
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}
async fn handle_requests(
&self,
requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
) -> Result<RegionResponse, BoxedError> {
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for (region_id, request) in requests {
let response = self.handle_request(region_id, request).await?;
affected_rows += response.affected_rows;
extensions.extend(response.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
}
#[cfg(test)]
impl MetricEngine {
pub async fn scan_to_stream(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
self.inner.scan_to_stream(region_id, request).await
}
}
struct MetricEngineInner {
mito: MitoEngine,
metadata_region: MetadataRegion,
data_region: DataRegion,
state: RwLock<MetricEngineState>,
config: EngineConfig,
row_modifier: RowModifier,
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn close_open_regions() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
let physical_region_id = env.default_physical_region_id();
engine
.handle_request(
physical_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap();
let nonexistent_region_id = RegionId::new(12313, 12);
engine
.handle_request(
nonexistent_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();
let invalid_open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: HashMap::new(),
skip_wal_replay: false,
};
engine
.handle_request(
nonexistent_region_id,
RegionRequest::Open(invalid_open_request),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_role() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();
assert!(env.metric().role(logical_region_id).is_none());
assert!(env.metric().role(physical_region_id).is_some());
}
#[tokio::test]
async fn test_region_disk_usage() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();
assert!(env.metric().region_statistic(logical_region_id).is_none());
assert!(env.metric().region_statistic(physical_region_id).is_some());
}
}