use api::v1::SemanticType;
use common_telemetry::{debug, info, warn};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ConcreteDataType, RegionId};
use crate::engine::IndexOptions;
use crate::error::{
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
use crate::utils;
pub struct DataRegion {
mito: MitoEngine,
}
impl DataRegion {
pub fn new(mito: MitoEngine) -> Self {
Self { mito }
}
pub async fn add_columns(
&self,
region_id: RegionId,
columns: Vec<ColumnMetadata>,
index_options: IndexOptions,
) -> Result<()> {
if columns.is_empty() {
return Ok(());
}
let region_id = utils::to_data_region_id(region_id);
let num_columns = columns.len();
let request = self
.assemble_alter_request(region_id, columns, index_options)
.await?;
let _timer = MITO_DDL_DURATION.start_timer();
let _ = self
.mito
.handle_request(region_id, request)
.await
.context(MitoWriteOperationSnafu)?;
PHYSICAL_COLUMN_COUNT.add(num_columns as _);
Ok(())
}
async fn assemble_alter_request(
&self,
region_id: RegionId,
columns: Vec<ColumnMetadata>,
index_options: IndexOptions,
) -> Result<RegionRequest> {
let region_metadata = self
.mito
.get_metadata(region_id)
.await
.context(MitoReadOperationSnafu)?;
let version = region_metadata.schema_version;
let new_column_id_start = 1 + region_metadata
.column_metadatas
.iter()
.filter_map(|c| {
if ReservedColumnId::is_reserved(c.column_id) {
None
} else {
Some(c.column_id)
}
})
.max()
.unwrap_or(0);
let new_columns = columns
.into_iter()
.enumerate()
.map(|(delta, mut c)| {
if c.semantic_type == SemanticType::Tag {
if !c.column_schema.data_type.is_string() {
return ColumnTypeMismatchSnafu {
expect: ConcreteDataType::string_datatype(),
actual: c.column_schema.data_type.clone(),
}
.fail();
}
} else {
warn!(
"Column {} in region {region_id} is not a tag",
c.column_schema.name
);
};
c.column_id = new_column_id_start + delta as u32;
c.column_schema.set_nullable();
match index_options {
IndexOptions::None => {}
IndexOptions::Inverted => {
c.column_schema.set_inverted_index(true);
}
IndexOptions::Skipping { granularity } => {
c.column_schema
.set_skipping_options(&SkippingIndexOptions {
granularity,
index_type: SkippingIndexType::BloomFilter,
})
.context(SetSkippingIndexOptionSnafu)?;
}
}
Ok(AddColumn {
column_metadata: c.clone(),
location: None,
})
})
.collect::<Result<_>>()?;
debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
kind: AlterKind::AddColumns {
columns: new_columns,
},
});
Ok(alter_request)
}
pub async fn write_data(
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<AffectedRows> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
.await
.context(MitoWriteOperationSnafu)
.map(|result| result.affected_rows)
}
pub async fn physical_columns(
&self,
physical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata = self
.mito
.get_metadata(data_region_id)
.await
.context(MitoReadOperationSnafu)?;
Ok(metadata.column_metadatas.clone())
}
pub async fn alter_region_options(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<AffectedRows> {
match request.kind {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ } => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))
.await
.context(MitoWriteOperationSnafu)
.map(|result| result.affected_rows)
}
_ => {
info!("Metric region received alter request {request:?} on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();
ForbiddenPhysicalAlterSnafu.fail()
}
}
}
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_add_columns() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let current_version = env
.mito()
.get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
.await
.unwrap()
.schema_version;
assert_eq!(current_version, 1);
let new_columns = vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag2",
ConcreteDataType::string_datatype(),
false,
),
},
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag3",
ConcreteDataType::string_datatype(),
false,
),
},
];
env.data_region()
.add_columns(
env.default_physical_region_id(),
new_columns,
IndexOptions::Inverted,
)
.await
.unwrap();
let new_metadata = env
.mito()
.get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
.await
.unwrap();
let column_names = new_metadata
.column_metadatas
.iter()
.map(|c| &c.column_schema.name)
.collect::<Vec<_>>();
let expected = vec![
"greptime_timestamp",
"greptime_value",
"__table_id",
"__tsid",
"job",
"tag2",
"tag3",
];
assert_eq!(column_names, expected);
}
#[tokio::test]
async fn test_add_invalid_column() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let new_columns = vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
}];
let result = env
.data_region()
.add_columns(
env.default_physical_region_id(),
new_columns,
IndexOptions::Inverted,
)
.await;
assert!(result.is_err());
}
}