metric_engine/
data_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::SemanticType;
16use common_telemetry::{debug, info, warn};
17use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
18use mito2::engine::MitoEngine;
19use snafu::ResultExt;
20use store_api::metadata::ColumnMetadata;
21use store_api::region_engine::RegionEngine;
22use store_api::region_request::{
23    AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
24};
25use store_api::storage::consts::ReservedColumnId;
26use store_api::storage::{ConcreteDataType, RegionId};
27
28use crate::engine::IndexOptions;
29use crate::error::{
30    ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
31    MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
32};
33use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
34use crate::utils;
35
36/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
37/// will handle all the data related operations across physical tables. Thus
38/// every operation should be associated to a [RegionId], which is the physical
39/// table id + region sequence. This handler will transform the region group by
40/// itself.
41pub struct DataRegion {
42    mito: MitoEngine,
43}
44
45impl DataRegion {
46    pub fn new(mito: MitoEngine) -> Self {
47        Self { mito }
48    }
49
50    /// Submit an alter request to underlying physical region.
51    ///
52    /// This method will change the nullability of those given columns.
53    /// [SemanticType::Tag] will become nullable column as it's shared between
54    /// logical regions.
55    ///
56    /// Invoker don't need to set up or verify the column id. This method will adjust
57    /// it using underlying schema.
58    ///
59    /// This method will also set the nullable marker to true. All of those change are applies
60    /// to `columns` in-place.
61    pub async fn add_columns(
62        &self,
63        region_id: RegionId,
64        columns: Vec<ColumnMetadata>,
65        index_options: IndexOptions,
66    ) -> Result<()> {
67        // Return early if no new columns are added.
68        if columns.is_empty() {
69            return Ok(());
70        }
71
72        let region_id = utils::to_data_region_id(region_id);
73
74        let num_columns = columns.len();
75        let request = self
76            .assemble_alter_request(region_id, columns, index_options)
77            .await?;
78
79        let _timer = MITO_DDL_DURATION.start_timer();
80
81        let _ = self
82            .mito
83            .handle_request(region_id, request)
84            .await
85            .context(MitoWriteOperationSnafu)?;
86
87        PHYSICAL_COLUMN_COUNT.add(num_columns as _);
88
89        Ok(())
90    }
91
92    /// Generate wrapped [RegionAlterRequest] with given [ColumnMetadata].
93    /// This method will modify `columns` in-place.
94    async fn assemble_alter_request(
95        &self,
96        region_id: RegionId,
97        columns: Vec<ColumnMetadata>,
98        index_options: IndexOptions,
99    ) -> Result<RegionRequest> {
100        // retrieve underlying version
101        let region_metadata = self
102            .mito
103            .get_metadata(region_id)
104            .await
105            .context(MitoReadOperationSnafu)?;
106
107        // find the max column id
108        let new_column_id_start = 1 + region_metadata
109            .column_metadatas
110            .iter()
111            .filter_map(|c| {
112                if ReservedColumnId::is_reserved(c.column_id) {
113                    None
114                } else {
115                    Some(c.column_id)
116                }
117            })
118            .max()
119            .unwrap_or(0);
120
121        // overwrite semantic type
122        let new_columns = columns
123            .into_iter()
124            .enumerate()
125            .map(|(delta, mut c)| {
126                if c.semantic_type == SemanticType::Tag {
127                    if !c.column_schema.data_type.is_string() {
128                        return ColumnTypeMismatchSnafu {
129                            expect: ConcreteDataType::string_datatype(),
130                            actual: c.column_schema.data_type.clone(),
131                        }
132                        .fail();
133                    }
134                } else {
135                    warn!(
136                        "Column {} in region {region_id} is not a tag",
137                        c.column_schema.name
138                    );
139                };
140
141                c.column_id = new_column_id_start + delta as u32;
142                c.column_schema.set_nullable();
143                match index_options {
144                    IndexOptions::None => {}
145                    IndexOptions::Inverted => {
146                        c.column_schema.set_inverted_index(true);
147                    }
148                    IndexOptions::Skipping { granularity } => {
149                        c.column_schema
150                            .set_skipping_options(&SkippingIndexOptions {
151                                granularity,
152                                index_type: SkippingIndexType::BloomFilter,
153                            })
154                            .context(SetSkippingIndexOptionSnafu)?;
155                    }
156                }
157
158                Ok(AddColumn {
159                    column_metadata: c.clone(),
160                    location: None,
161                })
162            })
163            .collect::<Result<_>>()?;
164
165        debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
166        // assemble alter request
167        let alter_request = RegionRequest::Alter(RegionAlterRequest {
168            kind: AlterKind::AddColumns {
169                columns: new_columns,
170            },
171        });
172
173        Ok(alter_request)
174    }
175
176    pub async fn write_data(
177        &self,
178        region_id: RegionId,
179        request: RegionPutRequest,
180    ) -> Result<AffectedRows> {
181        let region_id = utils::to_data_region_id(region_id);
182        self.mito
183            .handle_request(region_id, RegionRequest::Put(request))
184            .await
185            .context(MitoWriteOperationSnafu)
186            .map(|result| result.affected_rows)
187    }
188
189    pub async fn physical_columns(
190        &self,
191        physical_region_id: RegionId,
192    ) -> Result<Vec<ColumnMetadata>> {
193        let data_region_id = utils::to_data_region_id(physical_region_id);
194        let metadata = self
195            .mito
196            .get_metadata(data_region_id)
197            .await
198            .context(MitoReadOperationSnafu)?;
199        Ok(metadata.column_metadatas.clone())
200    }
201
202    pub async fn alter_region_options(
203        &self,
204        region_id: RegionId,
205        request: RegionAlterRequest,
206    ) -> Result<AffectedRows> {
207        match request.kind {
208            AlterKind::SetRegionOptions { options: _ }
209            | AlterKind::UnsetRegionOptions { keys: _ } => {
210                let region_id = utils::to_data_region_id(region_id);
211                self.mito
212                    .handle_request(region_id, RegionRequest::Alter(request))
213                    .await
214                    .context(MitoWriteOperationSnafu)
215                    .map(|result| result.affected_rows)
216            }
217            _ => {
218                info!("Metric region received alter request {request:?} on physical region {region_id:?}");
219                FORBIDDEN_OPERATION_COUNT.inc();
220
221                ForbiddenPhysicalAlterSnafu.fail()
222            }
223        }
224    }
225}
226
227#[cfg(test)]
228mod test {
229    use datatypes::prelude::ConcreteDataType;
230    use datatypes::schema::ColumnSchema;
231
232    use super::*;
233    use crate::test_util::TestEnv;
234
235    #[tokio::test]
236    async fn test_add_columns() {
237        let env = TestEnv::new().await;
238        env.init_metric_region().await;
239
240        let current_version = env
241            .mito()
242            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
243            .await
244            .unwrap()
245            .schema_version;
246        // TestEnv will create a logical region which changes the version to 1.
247        assert_eq!(current_version, 1);
248
249        let new_columns = vec![
250            ColumnMetadata {
251                column_id: 0,
252                semantic_type: SemanticType::Tag,
253                column_schema: ColumnSchema::new(
254                    "tag2",
255                    ConcreteDataType::string_datatype(),
256                    false,
257                ),
258            },
259            ColumnMetadata {
260                column_id: 0,
261                semantic_type: SemanticType::Tag,
262                column_schema: ColumnSchema::new(
263                    "tag3",
264                    ConcreteDataType::string_datatype(),
265                    false,
266                ),
267            },
268        ];
269        env.data_region()
270            .add_columns(
271                env.default_physical_region_id(),
272                new_columns,
273                IndexOptions::Inverted,
274            )
275            .await
276            .unwrap();
277
278        let new_metadata = env
279            .mito()
280            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
281            .await
282            .unwrap();
283        let column_names = new_metadata
284            .column_metadatas
285            .iter()
286            .map(|c| &c.column_schema.name)
287            .collect::<Vec<_>>();
288        let expected = vec![
289            "greptime_timestamp",
290            "greptime_value",
291            "__table_id",
292            "__tsid",
293            "job",
294            "tag2",
295            "tag3",
296        ];
297        assert_eq!(column_names, expected);
298    }
299
300    // Only string is allowed for tag column
301    #[tokio::test]
302    async fn test_add_invalid_column() {
303        let env = TestEnv::new().await;
304        env.init_metric_region().await;
305
306        let new_columns = vec![ColumnMetadata {
307            column_id: 0,
308            semantic_type: SemanticType::Tag,
309            column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
310        }];
311        let result = env
312            .data_region()
313            .add_columns(
314                env.default_physical_region_id(),
315                new_columns,
316                IndexOptions::Inverted,
317            )
318            .await;
319        assert!(result.is_err());
320    }
321}