metric_engine/
data_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::SemanticType;
16use common_telemetry::{debug, info, warn};
17use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
18use mito2::engine::MitoEngine;
19use snafu::ResultExt;
20use store_api::metadata::ColumnMetadata;
21use store_api::region_engine::RegionEngine;
22use store_api::region_request::{
23    AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
24};
25use store_api::storage::consts::ReservedColumnId;
26use store_api::storage::{ConcreteDataType, RegionId};
27
28use crate::engine::IndexOptions;
29use crate::error::{
30    ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
31    MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
32};
33use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
34use crate::utils;
35
36/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
37/// will handle all the data related operations across physical tables. Thus
38/// every operation should be associated to a [RegionId], which is the physical
39/// table id + region sequence. This handler will transform the region group by
40/// itself.
41pub struct DataRegion {
42    mito: MitoEngine,
43}
44
45impl DataRegion {
46    pub fn new(mito: MitoEngine) -> Self {
47        Self { mito }
48    }
49
50    /// Submit an alter request to underlying physical region.
51    ///
52    /// This method will change the nullability of those given columns.
53    /// [SemanticType::Tag] will become nullable column as it's shared between
54    /// logical regions.
55    ///
56    /// Invoker don't need to set up or verify the column id. This method will adjust
57    /// it using underlying schema.
58    ///
59    /// This method will also set the nullable marker to true. All of those change are applies
60    /// to `columns` in-place.
61    pub async fn add_columns(
62        &self,
63        region_id: RegionId,
64        columns: Vec<ColumnMetadata>,
65        index_options: IndexOptions,
66    ) -> Result<()> {
67        // Return early if no new columns are added.
68        if columns.is_empty() {
69            return Ok(());
70        }
71
72        let region_id = utils::to_data_region_id(region_id);
73
74        let num_columns = columns.len();
75        let request = self
76            .assemble_alter_request(region_id, columns, index_options)
77            .await?;
78
79        let _timer = MITO_DDL_DURATION.start_timer();
80
81        let _ = self
82            .mito
83            .handle_request(region_id, request)
84            .await
85            .context(MitoWriteOperationSnafu)?;
86
87        PHYSICAL_COLUMN_COUNT.add(num_columns as _);
88
89        Ok(())
90    }
91
92    /// Generate wrapped [RegionAlterRequest] with given [ColumnMetadata].
93    /// This method will modify `columns` in-place.
94    async fn assemble_alter_request(
95        &self,
96        region_id: RegionId,
97        columns: Vec<ColumnMetadata>,
98        index_options: IndexOptions,
99    ) -> Result<RegionRequest> {
100        // retrieve underlying version
101        let region_metadata = self
102            .mito
103            .get_metadata(region_id)
104            .await
105            .context(MitoReadOperationSnafu)?;
106
107        // find the max column id
108        let new_column_id_start = 1 + region_metadata
109            .column_metadatas
110            .iter()
111            .filter_map(|c| {
112                if ReservedColumnId::is_reserved(c.column_id) {
113                    None
114                } else {
115                    Some(c.column_id)
116                }
117            })
118            .max()
119            .unwrap_or(0);
120
121        // overwrite semantic type
122        let new_columns = columns
123            .into_iter()
124            .enumerate()
125            .map(|(delta, mut c)| {
126                if c.semantic_type == SemanticType::Tag {
127                    if !c.column_schema.data_type.is_string() {
128                        return ColumnTypeMismatchSnafu {
129                            expect: ConcreteDataType::string_datatype(),
130                            actual: c.column_schema.data_type.clone(),
131                        }
132                        .fail();
133                    }
134                } else {
135                    warn!(
136                        "Column {} in region {region_id} is not a tag",
137                        c.column_schema.name
138                    );
139                };
140
141                c.column_id = new_column_id_start + delta as u32;
142                c.column_schema.set_nullable();
143                match index_options {
144                    IndexOptions::None => {}
145                    IndexOptions::Inverted => {
146                        c.column_schema.set_inverted_index(true);
147                    }
148                    IndexOptions::Skipping {
149                        granularity,
150                        false_positive_rate,
151                    } => {
152                        c.column_schema
153                            .set_skipping_options(
154                                &SkippingIndexOptions::new(
155                                    granularity,
156                                    false_positive_rate,
157                                    SkippingIndexType::BloomFilter,
158                                )
159                                .context(SetSkippingIndexOptionSnafu)?,
160                            )
161                            .context(SetSkippingIndexOptionSnafu)?;
162                    }
163                }
164
165                Ok(AddColumn {
166                    column_metadata: c.clone(),
167                    location: None,
168                })
169            })
170            .collect::<Result<_>>()?;
171
172        debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
173        // assemble alter request
174        let alter_request = RegionRequest::Alter(RegionAlterRequest {
175            kind: AlterKind::AddColumns {
176                columns: new_columns,
177            },
178        });
179
180        Ok(alter_request)
181    }
182
183    pub async fn write_data(
184        &self,
185        region_id: RegionId,
186        request: RegionPutRequest,
187    ) -> Result<AffectedRows> {
188        let region_id = utils::to_data_region_id(region_id);
189        self.mito
190            .handle_request(region_id, RegionRequest::Put(request))
191            .await
192            .context(MitoWriteOperationSnafu)
193            .map(|result| result.affected_rows)
194    }
195
196    pub async fn physical_columns(
197        &self,
198        physical_region_id: RegionId,
199    ) -> Result<Vec<ColumnMetadata>> {
200        let data_region_id = utils::to_data_region_id(physical_region_id);
201        let metadata = self
202            .mito
203            .get_metadata(data_region_id)
204            .await
205            .context(MitoReadOperationSnafu)?;
206        Ok(metadata.column_metadatas.clone())
207    }
208
209    pub async fn alter_region_options(
210        &self,
211        region_id: RegionId,
212        request: RegionAlterRequest,
213    ) -> Result<AffectedRows> {
214        match request.kind {
215            AlterKind::SetRegionOptions { options: _ }
216            | AlterKind::UnsetRegionOptions { keys: _ }
217            | AlterKind::SetIndexes { options: _ }
218            | AlterKind::UnsetIndexes { options: _ }
219            | AlterKind::SyncColumns {
220                column_metadatas: _,
221            } => {
222                let region_id = utils::to_data_region_id(region_id);
223                self.mito
224                    .handle_request(region_id, RegionRequest::Alter(request))
225                    .await
226                    .context(MitoWriteOperationSnafu)
227                    .map(|result| result.affected_rows)
228            }
229            _ => {
230                info!("Metric region received alter request {request:?} on physical region {region_id:?}");
231                FORBIDDEN_OPERATION_COUNT.inc();
232
233                ForbiddenPhysicalAlterSnafu.fail()
234            }
235        }
236    }
237}
238
239#[cfg(test)]
240mod test {
241    use datatypes::prelude::ConcreteDataType;
242    use datatypes::schema::ColumnSchema;
243
244    use super::*;
245    use crate::test_util::TestEnv;
246
247    #[tokio::test]
248    async fn test_add_columns() {
249        let env = TestEnv::new().await;
250        env.init_metric_region().await;
251
252        let current_version = env
253            .mito()
254            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
255            .await
256            .unwrap()
257            .schema_version;
258        // TestEnv will create a logical region which changes the version to 1.
259        assert_eq!(current_version, 1);
260
261        let new_columns = vec![
262            ColumnMetadata {
263                column_id: 0,
264                semantic_type: SemanticType::Tag,
265                column_schema: ColumnSchema::new(
266                    "tag2",
267                    ConcreteDataType::string_datatype(),
268                    false,
269                ),
270            },
271            ColumnMetadata {
272                column_id: 0,
273                semantic_type: SemanticType::Tag,
274                column_schema: ColumnSchema::new(
275                    "tag3",
276                    ConcreteDataType::string_datatype(),
277                    false,
278                ),
279            },
280        ];
281        env.data_region()
282            .add_columns(
283                env.default_physical_region_id(),
284                new_columns,
285                IndexOptions::Inverted,
286            )
287            .await
288            .unwrap();
289
290        let new_metadata = env
291            .mito()
292            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
293            .await
294            .unwrap();
295        let column_names = new_metadata
296            .column_metadatas
297            .iter()
298            .map(|c| &c.column_schema.name)
299            .collect::<Vec<_>>();
300        let expected = vec![
301            "greptime_timestamp",
302            "greptime_value",
303            "__table_id",
304            "__tsid",
305            "job",
306            "tag2",
307            "tag3",
308        ];
309        assert_eq!(column_names, expected);
310    }
311
312    // Only string is allowed for tag column
313    #[tokio::test]
314    async fn test_add_invalid_column() {
315        let env = TestEnv::new().await;
316        env.init_metric_region().await;
317
318        let new_columns = vec![ColumnMetadata {
319            column_id: 0,
320            semantic_type: SemanticType::Tag,
321            column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
322        }];
323        let result = env
324            .data_region()
325            .add_columns(
326                env.default_physical_region_id(),
327                new_columns,
328                IndexOptions::Inverted,
329            )
330            .await;
331        assert!(result.is_err());
332    }
333}