metric_engine/
data_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::SemanticType;
16use common_telemetry::{debug, info, warn};
17use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
18use mito2::engine::MitoEngine;
19use snafu::ResultExt;
20use store_api::metadata::ColumnMetadata;
21use store_api::region_engine::RegionEngine;
22use store_api::region_request::{
23    AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
24};
25use store_api::storage::consts::ReservedColumnId;
26use store_api::storage::{ConcreteDataType, RegionId};
27
28use crate::engine::IndexOptions;
29use crate::error::{
30    ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
31    MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
32};
33use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
34use crate::utils;
35
36/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
37/// will handle all the data related operations across physical tables. Thus
38/// every operation should be associated to a [RegionId], which is the physical
39/// table id + region sequence. This handler will transform the region group by
40/// itself.
41pub struct DataRegion {
42    mito: MitoEngine,
43}
44
45impl DataRegion {
46    pub fn new(mito: MitoEngine) -> Self {
47        Self { mito }
48    }
49
50    /// Submit an alter request to underlying physical region.
51    ///
52    /// This method will change the nullability of those given columns.
53    /// [SemanticType::Tag] will become nullable column as it's shared between
54    /// logical regions.
55    ///
56    /// Invoker don't need to set up or verify the column id. This method will adjust
57    /// it using underlying schema.
58    ///
59    /// This method will also set the nullable marker to true. All of those change are applies
60    /// to `columns` in-place.
61    pub async fn add_columns(
62        &self,
63        region_id: RegionId,
64        columns: Vec<ColumnMetadata>,
65        index_options: IndexOptions,
66    ) -> Result<()> {
67        // Return early if no new columns are added.
68        if columns.is_empty() {
69            return Ok(());
70        }
71
72        let region_id = utils::to_data_region_id(region_id);
73
74        let num_columns = columns.len();
75        let request = self
76            .assemble_alter_request(region_id, columns, index_options)
77            .await?;
78
79        let _timer = MITO_DDL_DURATION.start_timer();
80
81        let _ = self
82            .mito
83            .handle_request(region_id, request)
84            .await
85            .context(MitoWriteOperationSnafu)?;
86
87        PHYSICAL_COLUMN_COUNT.add(num_columns as _);
88
89        Ok(())
90    }
91
92    /// Generate wrapped [RegionAlterRequest] with given [ColumnMetadata].
93    /// This method will modify `columns` in-place.
94    async fn assemble_alter_request(
95        &self,
96        region_id: RegionId,
97        columns: Vec<ColumnMetadata>,
98        index_options: IndexOptions,
99    ) -> Result<RegionRequest> {
100        // retrieve underlying version
101        let region_metadata = self
102            .mito
103            .get_metadata(region_id)
104            .await
105            .context(MitoReadOperationSnafu)?;
106
107        // find the max column id
108        let new_column_id_start = 1 + region_metadata
109            .column_metadatas
110            .iter()
111            .filter_map(|c| {
112                if ReservedColumnId::is_reserved(c.column_id) {
113                    None
114                } else {
115                    Some(c.column_id)
116                }
117            })
118            .max()
119            .unwrap_or(0);
120
121        // overwrite semantic type
122        let new_columns = columns
123            .into_iter()
124            .enumerate()
125            .map(|(delta, mut c)| {
126                if c.semantic_type == SemanticType::Tag {
127                    if !c.column_schema.data_type.is_string() {
128                        return ColumnTypeMismatchSnafu {
129                            expect: ConcreteDataType::string_datatype(),
130                            actual: c.column_schema.data_type.clone(),
131                        }
132                        .fail();
133                    }
134                } else {
135                    warn!(
136                        "Column {} in region {region_id} is not a tag",
137                        c.column_schema.name
138                    );
139                };
140
141                c.column_id = new_column_id_start + delta as u32;
142                c.column_schema.set_nullable();
143                match index_options {
144                    IndexOptions::None => {}
145                    IndexOptions::Inverted => {
146                        c.column_schema.set_inverted_index(true);
147                    }
148                    IndexOptions::Skipping {
149                        granularity,
150                        false_positive_rate,
151                    } => {
152                        c.column_schema
153                            .set_skipping_options(
154                                &SkippingIndexOptions::new(
155                                    granularity,
156                                    false_positive_rate,
157                                    SkippingIndexType::BloomFilter,
158                                )
159                                .context(SetSkippingIndexOptionSnafu)?,
160                            )
161                            .context(SetSkippingIndexOptionSnafu)?;
162                    }
163                }
164
165                Ok(AddColumn {
166                    column_metadata: c.clone(),
167                    location: None,
168                })
169            })
170            .collect::<Result<_>>()?;
171
172        debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
173        // assemble alter request
174        let alter_request = RegionRequest::Alter(RegionAlterRequest {
175            kind: AlterKind::AddColumns {
176                columns: new_columns,
177            },
178        });
179
180        Ok(alter_request)
181    }
182
183    pub async fn write_data(
184        &self,
185        region_id: RegionId,
186        request: RegionPutRequest,
187    ) -> Result<AffectedRows> {
188        let region_id = utils::to_data_region_id(region_id);
189        self.mito
190            .handle_request(region_id, RegionRequest::Put(request))
191            .await
192            .context(MitoWriteOperationSnafu)
193            .map(|result| result.affected_rows)
194    }
195
196    pub async fn physical_columns(
197        &self,
198        physical_region_id: RegionId,
199    ) -> Result<Vec<ColumnMetadata>> {
200        let data_region_id = utils::to_data_region_id(physical_region_id);
201        let metadata = self
202            .mito
203            .get_metadata(data_region_id)
204            .await
205            .context(MitoReadOperationSnafu)?;
206        Ok(metadata.column_metadatas.clone())
207    }
208
209    pub async fn alter_region_options(
210        &self,
211        region_id: RegionId,
212        request: RegionAlterRequest,
213    ) -> Result<AffectedRows> {
214        match request.kind {
215            AlterKind::SetRegionOptions { options: _ }
216            | AlterKind::UnsetRegionOptions { keys: _ }
217            | AlterKind::SetIndexes { options: _ }
218            | AlterKind::UnsetIndexes { options: _ }
219            | AlterKind::SyncColumns {
220                column_metadatas: _,
221            } => {
222                let region_id = utils::to_data_region_id(region_id);
223                self.mito
224                    .handle_request(region_id, RegionRequest::Alter(request))
225                    .await
226                    .context(MitoWriteOperationSnafu)
227                    .map(|result| result.affected_rows)
228            }
229            _ => {
230                info!(
231                    "Metric region received alter request {request:?} on physical region {region_id:?}"
232                );
233                FORBIDDEN_OPERATION_COUNT.inc();
234
235                ForbiddenPhysicalAlterSnafu.fail()
236            }
237        }
238    }
239}
240
241#[cfg(test)]
242mod test {
243    use common_query::prelude::{greptime_timestamp, greptime_value};
244    use datatypes::prelude::ConcreteDataType;
245    use datatypes::schema::ColumnSchema;
246
247    use super::*;
248    use crate::test_util::TestEnv;
249
250    #[tokio::test]
251    async fn test_add_columns() {
252        let env = TestEnv::new().await;
253        env.init_metric_region().await;
254
255        let current_version = env
256            .mito()
257            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
258            .await
259            .unwrap()
260            .schema_version;
261        // TestEnv will create a logical region which changes the version to 1.
262        assert_eq!(current_version, 1);
263
264        let new_columns = vec![
265            ColumnMetadata {
266                column_id: 0,
267                semantic_type: SemanticType::Tag,
268                column_schema: ColumnSchema::new(
269                    "tag2",
270                    ConcreteDataType::string_datatype(),
271                    false,
272                ),
273            },
274            ColumnMetadata {
275                column_id: 0,
276                semantic_type: SemanticType::Tag,
277                column_schema: ColumnSchema::new(
278                    "tag3",
279                    ConcreteDataType::string_datatype(),
280                    false,
281                ),
282            },
283        ];
284        env.data_region()
285            .add_columns(
286                env.default_physical_region_id(),
287                new_columns,
288                IndexOptions::Inverted,
289            )
290            .await
291            .unwrap();
292
293        let new_metadata = env
294            .mito()
295            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
296            .await
297            .unwrap();
298        let column_names = new_metadata
299            .column_metadatas
300            .iter()
301            .map(|c| &c.column_schema.name)
302            .collect::<Vec<_>>();
303        let expected = vec![
304            greptime_timestamp(),
305            greptime_value(),
306            "__table_id",
307            "__tsid",
308            "job",
309            "tag2",
310            "tag3",
311        ];
312        assert_eq!(column_names, expected);
313    }
314
315    // Only string is allowed for tag column
316    #[tokio::test]
317    async fn test_add_invalid_column() {
318        let env = TestEnv::new().await;
319        env.init_metric_region().await;
320
321        let new_columns = vec![ColumnMetadata {
322            column_id: 0,
323            semantic_type: SemanticType::Tag,
324            column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
325        }];
326        let result = env
327            .data_region()
328            .add_columns(
329                env.default_physical_region_id(),
330                new_columns,
331                IndexOptions::Inverted,
332            )
333            .await;
334        assert!(result.is_err());
335    }
336}