metric_engine/
data_region.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::SemanticType;
use common_telemetry::{debug, info, warn};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
    AddColumn, AffectedRows, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ConcreteDataType, RegionId};

use crate::engine::IndexOptions;
use crate::error::{
    ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
    MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
use crate::utils;

/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
/// will handle all the data related operations across physical tables. Thus
/// every operation should be associated to a [RegionId], which is the physical
/// table id + region sequence. This handler will transform the region group by
/// itself.
pub struct DataRegion {
    mito: MitoEngine,
}

impl DataRegion {
    pub fn new(mito: MitoEngine) -> Self {
        Self { mito }
    }

    /// Submit an alter request to underlying physical region.
    ///
    /// This method will change the nullability of those given columns.
    /// [SemanticType::Tag] will become nullable column as it's shared between
    /// logical regions.
    ///
    /// Invoker don't need to set up or verify the column id. This method will adjust
    /// it using underlying schema.
    ///
    /// This method will also set the nullable marker to true. All of those change are applies
    /// to `columns` in-place.
    pub async fn add_columns(
        &self,
        region_id: RegionId,
        columns: Vec<ColumnMetadata>,
        index_options: IndexOptions,
    ) -> Result<()> {
        // Return early if no new columns are added.
        if columns.is_empty() {
            return Ok(());
        }

        let region_id = utils::to_data_region_id(region_id);

        let num_columns = columns.len();
        let request = self
            .assemble_alter_request(region_id, columns, index_options)
            .await?;

        let _timer = MITO_DDL_DURATION.start_timer();

        let _ = self
            .mito
            .handle_request(region_id, request)
            .await
            .context(MitoWriteOperationSnafu)?;

        PHYSICAL_COLUMN_COUNT.add(num_columns as _);

        Ok(())
    }

    /// Generate wrapped [RegionAlterRequest] with given [ColumnMetadata].
    /// This method will modify `columns` in-place.
    async fn assemble_alter_request(
        &self,
        region_id: RegionId,
        columns: Vec<ColumnMetadata>,
        index_options: IndexOptions,
    ) -> Result<RegionRequest> {
        // retrieve underlying version
        let region_metadata = self
            .mito
            .get_metadata(region_id)
            .await
            .context(MitoReadOperationSnafu)?;
        let version = region_metadata.schema_version;

        // find the max column id
        let new_column_id_start = 1 + region_metadata
            .column_metadatas
            .iter()
            .filter_map(|c| {
                if ReservedColumnId::is_reserved(c.column_id) {
                    None
                } else {
                    Some(c.column_id)
                }
            })
            .max()
            .unwrap_or(0);

        // overwrite semantic type
        let new_columns = columns
            .into_iter()
            .enumerate()
            .map(|(delta, mut c)| {
                if c.semantic_type == SemanticType::Tag {
                    if !c.column_schema.data_type.is_string() {
                        return ColumnTypeMismatchSnafu {
                            expect: ConcreteDataType::string_datatype(),
                            actual: c.column_schema.data_type.clone(),
                        }
                        .fail();
                    }
                } else {
                    warn!(
                        "Column {} in region {region_id} is not a tag",
                        c.column_schema.name
                    );
                };

                c.column_id = new_column_id_start + delta as u32;
                c.column_schema.set_nullable();
                match index_options {
                    IndexOptions::None => {}
                    IndexOptions::Inverted => {
                        c.column_schema.set_inverted_index(true);
                    }
                    IndexOptions::Skipping { granularity } => {
                        c.column_schema
                            .set_skipping_options(&SkippingIndexOptions {
                                granularity,
                                index_type: SkippingIndexType::BloomFilter,
                            })
                            .context(SetSkippingIndexOptionSnafu)?;
                    }
                }

                Ok(AddColumn {
                    column_metadata: c.clone(),
                    location: None,
                })
            })
            .collect::<Result<_>>()?;

        debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
        // assemble alter request
        let alter_request = RegionRequest::Alter(RegionAlterRequest {
            schema_version: version,
            kind: AlterKind::AddColumns {
                columns: new_columns,
            },
        });

        Ok(alter_request)
    }

    pub async fn write_data(
        &self,
        region_id: RegionId,
        request: RegionPutRequest,
    ) -> Result<AffectedRows> {
        let region_id = utils::to_data_region_id(region_id);
        self.mito
            .handle_request(region_id, RegionRequest::Put(request))
            .await
            .context(MitoWriteOperationSnafu)
            .map(|result| result.affected_rows)
    }

    pub async fn physical_columns(
        &self,
        physical_region_id: RegionId,
    ) -> Result<Vec<ColumnMetadata>> {
        let data_region_id = utils::to_data_region_id(physical_region_id);
        let metadata = self
            .mito
            .get_metadata(data_region_id)
            .await
            .context(MitoReadOperationSnafu)?;
        Ok(metadata.column_metadatas.clone())
    }

    pub async fn alter_region_options(
        &self,
        region_id: RegionId,
        request: RegionAlterRequest,
    ) -> Result<AffectedRows> {
        match request.kind {
            AlterKind::SetRegionOptions { options: _ }
            | AlterKind::UnsetRegionOptions { keys: _ } => {
                let region_id = utils::to_data_region_id(region_id);
                self.mito
                    .handle_request(region_id, RegionRequest::Alter(request))
                    .await
                    .context(MitoWriteOperationSnafu)
                    .map(|result| result.affected_rows)
            }
            _ => {
                info!("Metric region received alter request {request:?} on physical region {region_id:?}");
                FORBIDDEN_OPERATION_COUNT.inc();

                ForbiddenPhysicalAlterSnafu.fail()
            }
        }
    }
}

#[cfg(test)]
mod test {
    use datatypes::prelude::ConcreteDataType;
    use datatypes::schema::ColumnSchema;

    use super::*;
    use crate::test_util::TestEnv;

    #[tokio::test]
    async fn test_add_columns() {
        let env = TestEnv::new().await;
        env.init_metric_region().await;

        let current_version = env
            .mito()
            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
            .await
            .unwrap()
            .schema_version;
        // TestEnv will create a logical region which changes the version to 1.
        assert_eq!(current_version, 1);

        let new_columns = vec![
            ColumnMetadata {
                column_id: 0,
                semantic_type: SemanticType::Tag,
                column_schema: ColumnSchema::new(
                    "tag2",
                    ConcreteDataType::string_datatype(),
                    false,
                ),
            },
            ColumnMetadata {
                column_id: 0,
                semantic_type: SemanticType::Tag,
                column_schema: ColumnSchema::new(
                    "tag3",
                    ConcreteDataType::string_datatype(),
                    false,
                ),
            },
        ];
        env.data_region()
            .add_columns(
                env.default_physical_region_id(),
                new_columns,
                IndexOptions::Inverted,
            )
            .await
            .unwrap();

        let new_metadata = env
            .mito()
            .get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
            .await
            .unwrap();
        let column_names = new_metadata
            .column_metadatas
            .iter()
            .map(|c| &c.column_schema.name)
            .collect::<Vec<_>>();
        let expected = vec![
            "greptime_timestamp",
            "greptime_value",
            "__table_id",
            "__tsid",
            "job",
            "tag2",
            "tag3",
        ];
        assert_eq!(column_names, expected);
    }

    // Only string is allowed for tag column
    #[tokio::test]
    async fn test_add_invalid_column() {
        let env = TestEnv::new().await;
        env.init_metric_region().await;

        let new_columns = vec![ColumnMetadata {
            column_id: 0,
            semantic_type: SemanticType::Tag,
            column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
        }];
        let result = env
            .data_region()
            .add_columns(
                env.default_physical_region_id(),
                new_columns,
                IndexOptions::Inverted,
            )
            .await;
        assert!(result.is_err());
    }
}