1use api::v1::SemanticType;
16use common_telemetry::{debug, info, warn};
17use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
18use mito2::engine::MitoEngine;
19use snafu::ResultExt;
20use store_api::metadata::ColumnMetadata;
21use store_api::region_engine::RegionEngine;
22use store_api::region_request::{
23 AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
24};
25use store_api::storage::consts::ReservedColumnId;
26use store_api::storage::{ConcreteDataType, RegionId};
27
28use crate::engine::IndexOptions;
29use crate::error::{
30 ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
31 MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
32};
33use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
34use crate::utils;
35
36pub struct DataRegion {
42 mito: MitoEngine,
43}
44
45impl DataRegion {
46 pub fn new(mito: MitoEngine) -> Self {
47 Self { mito }
48 }
49
50 pub async fn add_columns(
62 &self,
63 region_id: RegionId,
64 columns: Vec<ColumnMetadata>,
65 index_options: IndexOptions,
66 ) -> Result<()> {
67 if columns.is_empty() {
69 return Ok(());
70 }
71
72 let region_id = utils::to_data_region_id(region_id);
73
74 let num_columns = columns.len();
75 let request = self
76 .assemble_alter_request(region_id, columns, index_options)
77 .await?;
78
79 let _timer = MITO_DDL_DURATION.start_timer();
80
81 let _ = self
82 .mito
83 .handle_request(region_id, request)
84 .await
85 .context(MitoWriteOperationSnafu)?;
86
87 PHYSICAL_COLUMN_COUNT.add(num_columns as _);
88
89 Ok(())
90 }
91
92 async fn assemble_alter_request(
95 &self,
96 region_id: RegionId,
97 columns: Vec<ColumnMetadata>,
98 index_options: IndexOptions,
99 ) -> Result<RegionRequest> {
100 let region_metadata = self
102 .mito
103 .get_metadata(region_id)
104 .await
105 .context(MitoReadOperationSnafu)?;
106
107 let new_column_id_start = 1 + region_metadata
109 .column_metadatas
110 .iter()
111 .filter_map(|c| {
112 if ReservedColumnId::is_reserved(c.column_id) {
113 None
114 } else {
115 Some(c.column_id)
116 }
117 })
118 .max()
119 .unwrap_or(0);
120
121 let new_columns = columns
123 .into_iter()
124 .enumerate()
125 .map(|(delta, mut c)| {
126 if c.semantic_type == SemanticType::Tag {
127 if !c.column_schema.data_type.is_string() {
128 return ColumnTypeMismatchSnafu {
129 expect: ConcreteDataType::string_datatype(),
130 actual: c.column_schema.data_type.clone(),
131 }
132 .fail();
133 }
134 } else {
135 warn!(
136 "Column {} in region {region_id} is not a tag",
137 c.column_schema.name
138 );
139 };
140
141 c.column_id = new_column_id_start + delta as u32;
142 c.column_schema.set_nullable();
143 match index_options {
144 IndexOptions::None => {}
145 IndexOptions::Inverted => {
146 c.column_schema.set_inverted_index(true);
147 }
148 IndexOptions::Skipping {
149 granularity,
150 false_positive_rate,
151 } => {
152 c.column_schema
153 .set_skipping_options(
154 &SkippingIndexOptions::new(
155 granularity,
156 false_positive_rate,
157 SkippingIndexType::BloomFilter,
158 )
159 .context(SetSkippingIndexOptionSnafu)?,
160 )
161 .context(SetSkippingIndexOptionSnafu)?;
162 }
163 }
164
165 Ok(AddColumn {
166 column_metadata: c.clone(),
167 location: None,
168 })
169 })
170 .collect::<Result<_>>()?;
171
172 debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
173 let alter_request = RegionRequest::Alter(RegionAlterRequest {
175 kind: AlterKind::AddColumns {
176 columns: new_columns,
177 },
178 });
179
180 Ok(alter_request)
181 }
182
183 pub async fn write_data(
184 &self,
185 region_id: RegionId,
186 request: RegionPutRequest,
187 ) -> Result<AffectedRows> {
188 let region_id = utils::to_data_region_id(region_id);
189 self.mito
190 .handle_request(region_id, RegionRequest::Put(request))
191 .await
192 .context(MitoWriteOperationSnafu)
193 .map(|result| result.affected_rows)
194 }
195
196 pub async fn physical_columns(
197 &self,
198 physical_region_id: RegionId,
199 ) -> Result<Vec<ColumnMetadata>> {
200 let data_region_id = utils::to_data_region_id(physical_region_id);
201 let metadata = self
202 .mito
203 .get_metadata(data_region_id)
204 .await
205 .context(MitoReadOperationSnafu)?;
206 Ok(metadata.column_metadatas.clone())
207 }
208
209 pub async fn alter_region_options(
210 &self,
211 region_id: RegionId,
212 request: RegionAlterRequest,
213 ) -> Result<AffectedRows> {
214 match request.kind {
215 AlterKind::SetRegionOptions { options: _ }
216 | AlterKind::UnsetRegionOptions { keys: _ }
217 | AlterKind::SetIndexes { options: _ }
218 | AlterKind::UnsetIndexes { options: _ }
219 | AlterKind::SyncColumns {
220 column_metadatas: _,
221 } => {
222 let region_id = utils::to_data_region_id(region_id);
223 self.mito
224 .handle_request(region_id, RegionRequest::Alter(request))
225 .await
226 .context(MitoWriteOperationSnafu)
227 .map(|result| result.affected_rows)
228 }
229 _ => {
230 info!(
231 "Metric region received alter request {request:?} on physical region {region_id:?}"
232 );
233 FORBIDDEN_OPERATION_COUNT.inc();
234
235 ForbiddenPhysicalAlterSnafu.fail()
236 }
237 }
238 }
239}
240
241#[cfg(test)]
242mod test {
243 use common_query::prelude::{greptime_timestamp, greptime_value};
244 use datatypes::prelude::ConcreteDataType;
245 use datatypes::schema::ColumnSchema;
246
247 use super::*;
248 use crate::test_util::TestEnv;
249
250 #[tokio::test]
251 async fn test_add_columns() {
252 let env = TestEnv::new().await;
253 env.init_metric_region().await;
254
255 let current_version = env
256 .mito()
257 .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
258 .await
259 .unwrap()
260 .schema_version;
261 assert_eq!(current_version, 1);
263
264 let new_columns = vec![
265 ColumnMetadata {
266 column_id: 0,
267 semantic_type: SemanticType::Tag,
268 column_schema: ColumnSchema::new(
269 "tag2",
270 ConcreteDataType::string_datatype(),
271 false,
272 ),
273 },
274 ColumnMetadata {
275 column_id: 0,
276 semantic_type: SemanticType::Tag,
277 column_schema: ColumnSchema::new(
278 "tag3",
279 ConcreteDataType::string_datatype(),
280 false,
281 ),
282 },
283 ];
284 env.data_region()
285 .add_columns(
286 env.default_physical_region_id(),
287 new_columns,
288 IndexOptions::Inverted,
289 )
290 .await
291 .unwrap();
292
293 let new_metadata = env
294 .mito()
295 .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
296 .await
297 .unwrap();
298 let column_names = new_metadata
299 .column_metadatas
300 .iter()
301 .map(|c| &c.column_schema.name)
302 .collect::<Vec<_>>();
303 let expected = vec![
304 greptime_timestamp(),
305 greptime_value(),
306 "__table_id",
307 "__tsid",
308 "job",
309 "tag2",
310 "tag3",
311 ];
312 assert_eq!(column_names, expected);
313 }
314
315 #[tokio::test]
317 async fn test_add_invalid_column() {
318 let env = TestEnv::new().await;
319 env.init_metric_region().await;
320
321 let new_columns = vec![ColumnMetadata {
322 column_id: 0,
323 semantic_type: SemanticType::Tag,
324 column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
325 }];
326 let result = env
327 .data_region()
328 .add_columns(
329 env.default_physical_region_id(),
330 new_columns,
331 IndexOptions::Inverted,
332 )
333 .await;
334 assert!(result.is_err());
335 }
336}