metric_engine/
data_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::SemanticType;
16use common_telemetry::{debug, info, warn};
17use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
18use mito2::engine::MitoEngine;
19use snafu::ResultExt;
20use store_api::metadata::ColumnMetadata;
21use store_api::region_engine::RegionEngine;
22use store_api::region_request::{
23    AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
24};
25use store_api::storage::consts::ReservedColumnId;
26use store_api::storage::{ConcreteDataType, RegionId};
27
28use crate::engine::IndexOptions;
29use crate::error::{
30    ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
31    MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
32};
33use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
34use crate::utils;
35
36/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
37/// will handle all the data related operations across physical tables. Thus
38/// every operation should be associated to a [RegionId], which is the physical
39/// table id + region sequence. This handler will transform the region group by
40/// itself.
41pub struct DataRegion {
42    mito: MitoEngine,
43}
44
45impl DataRegion {
46    pub fn new(mito: MitoEngine) -> Self {
47        Self { mito }
48    }
49
50    /// Submit an alter request to underlying physical region.
51    ///
52    /// This method will change the nullability of those given columns.
53    /// [SemanticType::Tag] will become nullable column as it's shared between
54    /// logical regions.
55    ///
56    /// Invoker don't need to set up or verify the column id. This method will adjust
57    /// it using underlying schema.
58    ///
59    /// This method will also set the nullable marker to true. All of those change are applies
60    /// to `columns` in-place.
61    pub async fn add_columns(
62        &self,
63        region_id: RegionId,
64        columns: Vec<ColumnMetadata>,
65        index_options: IndexOptions,
66    ) -> Result<()> {
67        // Return early if no new columns are added.
68        if columns.is_empty() {
69            return Ok(());
70        }
71
72        let region_id = utils::to_data_region_id(region_id);
73
74        let num_columns = columns.len();
75        let request = self
76            .assemble_alter_request(region_id, columns, index_options)
77            .await?;
78
79        let _timer = MITO_DDL_DURATION.start_timer();
80
81        let _ = self
82            .mito
83            .handle_request(region_id, request)
84            .await
85            .context(MitoWriteOperationSnafu)?;
86
87        PHYSICAL_COLUMN_COUNT.add(num_columns as _);
88
89        Ok(())
90    }
91
92    /// Generate wrapped [RegionAlterRequest] with given [ColumnMetadata].
93    /// This method will modify `columns` in-place.
94    async fn assemble_alter_request(
95        &self,
96        region_id: RegionId,
97        columns: Vec<ColumnMetadata>,
98        index_options: IndexOptions,
99    ) -> Result<RegionRequest> {
100        // retrieve underlying version
101        let region_metadata = self
102            .mito
103            .get_metadata(region_id)
104            .await
105            .context(MitoReadOperationSnafu)?;
106
107        // find the max column id
108        let new_column_id_start = 1 + region_metadata
109            .column_metadatas
110            .iter()
111            .filter_map(|c| {
112                if ReservedColumnId::is_reserved(c.column_id) {
113                    None
114                } else {
115                    Some(c.column_id)
116                }
117            })
118            .max()
119            .unwrap_or(0);
120
121        // overwrite semantic type
122        let new_columns = columns
123            .into_iter()
124            .enumerate()
125            .map(|(delta, mut c)| {
126                if c.semantic_type == SemanticType::Tag {
127                    if !c.column_schema.data_type.is_string() {
128                        return ColumnTypeMismatchSnafu {
129                            expect: ConcreteDataType::string_datatype(),
130                            actual: c.column_schema.data_type.clone(),
131                        }
132                        .fail();
133                    }
134                } else {
135                    warn!(
136                        "Column {} in region {region_id} is not a tag",
137                        c.column_schema.name
138                    );
139                };
140
141                c.column_id = new_column_id_start + delta as u32;
142                c.column_schema.set_nullable();
143                match index_options {
144                    IndexOptions::None => {}
145                    IndexOptions::Inverted => {
146                        c.column_schema.set_inverted_index(true);
147                    }
148                    IndexOptions::Skipping {
149                        granularity,
150                        false_positive_rate,
151                    } => {
152                        c.column_schema
153                            .set_skipping_options(
154                                &SkippingIndexOptions::new(
155                                    granularity,
156                                    false_positive_rate,
157                                    SkippingIndexType::BloomFilter,
158                                )
159                                .context(SetSkippingIndexOptionSnafu)?,
160                            )
161                            .context(SetSkippingIndexOptionSnafu)?;
162                    }
163                }
164
165                Ok(AddColumn {
166                    column_metadata: c.clone(),
167                    location: None,
168                })
169            })
170            .collect::<Result<_>>()?;
171
172        debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
173        // assemble alter request
174        let alter_request = RegionRequest::Alter(RegionAlterRequest {
175            kind: AlterKind::AddColumns {
176                columns: new_columns,
177            },
178        });
179
180        Ok(alter_request)
181    }
182
183    pub async fn write_data(
184        &self,
185        region_id: RegionId,
186        request: RegionPutRequest,
187    ) -> Result<AffectedRows> {
188        let region_id = utils::to_data_region_id(region_id);
189        self.mito
190            .handle_request(region_id, RegionRequest::Put(request))
191            .await
192            .context(MitoWriteOperationSnafu)
193            .map(|result| result.affected_rows)
194    }
195
196    pub async fn physical_columns(
197        &self,
198        physical_region_id: RegionId,
199    ) -> Result<Vec<ColumnMetadata>> {
200        let data_region_id = utils::to_data_region_id(physical_region_id);
201        let metadata = self
202            .mito
203            .get_metadata(data_region_id)
204            .await
205            .context(MitoReadOperationSnafu)?;
206        Ok(metadata.column_metadatas.clone())
207    }
208
209    pub async fn alter_region_options(
210        &self,
211        region_id: RegionId,
212        request: RegionAlterRequest,
213    ) -> Result<AffectedRows> {
214        match request.kind {
215            AlterKind::SetRegionOptions { options: _ }
216            | AlterKind::UnsetRegionOptions { keys: _ }
217            | AlterKind::SetIndex { options: _ }
218            | AlterKind::UnsetIndex { options: _ } => {
219                let region_id = utils::to_data_region_id(region_id);
220                self.mito
221                    .handle_request(region_id, RegionRequest::Alter(request))
222                    .await
223                    .context(MitoWriteOperationSnafu)
224                    .map(|result| result.affected_rows)
225            }
226            _ => {
227                info!("Metric region received alter request {request:?} on physical region {region_id:?}");
228                FORBIDDEN_OPERATION_COUNT.inc();
229
230                ForbiddenPhysicalAlterSnafu.fail()
231            }
232        }
233    }
234}
235
236#[cfg(test)]
237mod test {
238    use datatypes::prelude::ConcreteDataType;
239    use datatypes::schema::ColumnSchema;
240
241    use super::*;
242    use crate::test_util::TestEnv;
243
244    #[tokio::test]
245    async fn test_add_columns() {
246        let env = TestEnv::new().await;
247        env.init_metric_region().await;
248
249        let current_version = env
250            .mito()
251            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
252            .await
253            .unwrap()
254            .schema_version;
255        // TestEnv will create a logical region which changes the version to 1.
256        assert_eq!(current_version, 1);
257
258        let new_columns = vec![
259            ColumnMetadata {
260                column_id: 0,
261                semantic_type: SemanticType::Tag,
262                column_schema: ColumnSchema::new(
263                    "tag2",
264                    ConcreteDataType::string_datatype(),
265                    false,
266                ),
267            },
268            ColumnMetadata {
269                column_id: 0,
270                semantic_type: SemanticType::Tag,
271                column_schema: ColumnSchema::new(
272                    "tag3",
273                    ConcreteDataType::string_datatype(),
274                    false,
275                ),
276            },
277        ];
278        env.data_region()
279            .add_columns(
280                env.default_physical_region_id(),
281                new_columns,
282                IndexOptions::Inverted,
283            )
284            .await
285            .unwrap();
286
287        let new_metadata = env
288            .mito()
289            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
290            .await
291            .unwrap();
292        let column_names = new_metadata
293            .column_metadatas
294            .iter()
295            .map(|c| &c.column_schema.name)
296            .collect::<Vec<_>>();
297        let expected = vec![
298            "greptime_timestamp",
299            "greptime_value",
300            "__table_id",
301            "__tsid",
302            "job",
303            "tag2",
304            "tag3",
305        ];
306        assert_eq!(column_names, expected);
307    }
308
309    // Only string is allowed for tag column
310    #[tokio::test]
311    async fn test_add_invalid_column() {
312        let env = TestEnv::new().await;
313        env.init_metric_region().await;
314
315        let new_columns = vec![ColumnMetadata {
316            column_id: 0,
317            semantic_type: SemanticType::Tag,
318            column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
319        }];
320        let result = env
321            .data_region()
322            .add_columns(
323                env.default_physical_region_id(),
324                new_columns,
325                IndexOptions::Inverted,
326            )
327            .await;
328        assert!(result.is_err());
329    }
330}