metric_engine/engine/
put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::{Rows, WriteHint};
16use common_telemetry::{error, info};
17use snafu::{ensure, OptionExt};
18use store_api::codec::PrimaryKeyEncoding;
19use store_api::region_request::{AffectedRows, RegionPutRequest};
20use store_api::storage::{RegionId, TableId};
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24    ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu,
25    PhysicalRegionNotFoundSnafu, Result,
26};
27use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
28use crate::row_modifier::RowsIter;
29use crate::utils::to_data_region_id;
30
31impl MetricEngineInner {
32    /// Dispatch region put request
33    pub async fn put_region(
34        &self,
35        region_id: RegionId,
36        request: RegionPutRequest,
37    ) -> Result<AffectedRows> {
38        let is_putting_physical_region =
39            self.state.read().unwrap().exist_physical_region(region_id);
40
41        if is_putting_physical_region {
42            info!(
43                "Metric region received put request {request:?} on physical region {region_id:?}"
44            );
45            FORBIDDEN_OPERATION_COUNT.inc();
46
47            ForbiddenPhysicalAlterSnafu.fail()
48        } else {
49            self.put_logical_region(region_id, request).await
50        }
51    }
52
53    async fn put_logical_region(
54        &self,
55        logical_region_id: RegionId,
56        mut request: RegionPutRequest,
57    ) -> Result<AffectedRows> {
58        let _timer = MITO_OPERATION_ELAPSED
59            .with_label_values(&["put"])
60            .start_timer();
61
62        let (physical_region_id, data_region_id, primary_key_encoding) = {
63            let state = self.state.read().unwrap();
64            let physical_region_id = *state
65                .logical_regions()
66                .get(&logical_region_id)
67                .with_context(|| LogicalRegionNotFoundSnafu {
68                    region_id: logical_region_id,
69                })?;
70            let data_region_id = to_data_region_id(physical_region_id);
71
72            let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
73                PhysicalRegionNotFoundSnafu {
74                    region_id: data_region_id,
75                },
76            )?;
77
78            (physical_region_id, data_region_id, primary_key_encoding)
79        };
80
81        self.verify_put_request(logical_region_id, physical_region_id, &request)
82            .await?;
83
84        // write to data region
85
86        // TODO: retrieve table name
87        self.modify_rows(
88            physical_region_id,
89            logical_region_id.table_id(),
90            &mut request.rows,
91            primary_key_encoding,
92        )?;
93        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
94            request.hint = Some(WriteHint {
95                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
96            });
97        }
98        self.data_region.write_data(data_region_id, request).await
99    }
100
101    /// Verifies a put request for a logical region against its corresponding metadata region.
102    ///
103    /// Includes:
104    /// - Check if the logical region exists
105    /// - Check if the columns exist
106    async fn verify_put_request(
107        &self,
108        logical_region_id: RegionId,
109        physical_region_id: RegionId,
110        request: &RegionPutRequest,
111    ) -> Result<()> {
112        // Check if the region exists
113        let data_region_id = to_data_region_id(physical_region_id);
114        let state = self.state.read().unwrap();
115        if !state.is_logical_region_exist(logical_region_id) {
116            error!("Trying to write to an nonexistent region {logical_region_id}");
117            return LogicalRegionNotFoundSnafu {
118                region_id: logical_region_id,
119            }
120            .fail();
121        }
122
123        // Check if a physical column exists
124        let physical_columns = state
125            .physical_region_states()
126            .get(&data_region_id)
127            .context(PhysicalRegionNotFoundSnafu {
128                region_id: data_region_id,
129            })?
130            .physical_columns();
131        for col in &request.rows.schema {
132            ensure!(
133                physical_columns.contains_key(&col.column_name),
134                ColumnNotFoundSnafu {
135                    name: col.column_name.clone(),
136                    region_id: logical_region_id,
137                }
138            );
139        }
140
141        Ok(())
142    }
143
144    /// Perform metric engine specific logic to incoming rows.
145    /// - Add table_id column
146    /// - Generate tsid
147    fn modify_rows(
148        &self,
149        physical_region_id: RegionId,
150        table_id: TableId,
151        rows: &mut Rows,
152        encoding: PrimaryKeyEncoding,
153    ) -> Result<()> {
154        let input = std::mem::take(rows);
155        let iter = {
156            let state = self.state.read().unwrap();
157            let name_to_id = state
158                .physical_region_states()
159                .get(&physical_region_id)
160                .with_context(|| PhysicalRegionNotFoundSnafu {
161                    region_id: physical_region_id,
162                })?
163                .physical_columns();
164            RowsIter::new(input, name_to_id)
165        };
166        let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
167        *rows = output;
168        Ok(())
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use common_recordbatch::RecordBatches;
175    use store_api::region_engine::RegionEngine;
176    use store_api::region_request::RegionRequest;
177    use store_api::storage::ScanRequest;
178
179    use super::*;
180    use crate::test_util::{self, TestEnv};
181
182    #[tokio::test]
183    async fn test_write_logical_region() {
184        let env = TestEnv::new().await;
185        env.init_metric_region().await;
186
187        // prepare data
188        let schema = test_util::row_schema_with_tags(&["job"]);
189        let rows = test_util::build_rows(1, 5);
190        let request = RegionRequest::Put(RegionPutRequest {
191            rows: Rows { schema, rows },
192            hint: None,
193        });
194
195        // write data
196        let logical_region_id = env.default_logical_region_id();
197        let result = env
198            .metric()
199            .handle_request(logical_region_id, request)
200            .await
201            .unwrap();
202        assert_eq!(result.affected_rows, 5);
203
204        // read data from physical region
205        let physical_region_id = env.default_physical_region_id();
206        let request = ScanRequest::default();
207        let stream = env
208            .metric()
209            .scan_to_stream(physical_region_id, request)
210            .await
211            .unwrap();
212        let batches = RecordBatches::try_collect(stream).await.unwrap();
213        let expected = "\
214+-------------------------+----------------+------------+----------------------+-------+
215| greptime_timestamp      | greptime_value | __table_id | __tsid               | job   |
216+-------------------------+----------------+------------+----------------------+-------+
217| 1970-01-01T00:00:00     | 0.0            | 3          | 12881218023286672757 | tag_0 |
218| 1970-01-01T00:00:00.001 | 1.0            | 3          | 12881218023286672757 | tag_0 |
219| 1970-01-01T00:00:00.002 | 2.0            | 3          | 12881218023286672757 | tag_0 |
220| 1970-01-01T00:00:00.003 | 3.0            | 3          | 12881218023286672757 | tag_0 |
221| 1970-01-01T00:00:00.004 | 4.0            | 3          | 12881218023286672757 | tag_0 |
222+-------------------------+----------------+------------+----------------------+-------+";
223        assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
224
225        // read data from logical region
226        let request = ScanRequest::default();
227        let stream = env
228            .metric()
229            .scan_to_stream(logical_region_id, request)
230            .await
231            .unwrap();
232        let batches = RecordBatches::try_collect(stream).await.unwrap();
233        let expected = "\
234+-------------------------+----------------+-------+
235| greptime_timestamp      | greptime_value | job   |
236+-------------------------+----------------+-------+
237| 1970-01-01T00:00:00     | 0.0            | tag_0 |
238| 1970-01-01T00:00:00.001 | 1.0            | tag_0 |
239| 1970-01-01T00:00:00.002 | 2.0            | tag_0 |
240| 1970-01-01T00:00:00.003 | 3.0            | tag_0 |
241| 1970-01-01T00:00:00.004 | 4.0            | tag_0 |
242+-------------------------+----------------+-------+";
243        assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
244    }
245
246    #[tokio::test]
247    async fn test_write_logical_region_row_count() {
248        let env = TestEnv::new().await;
249        env.init_metric_region().await;
250        let engine = env.metric();
251
252        // add columns
253        let logical_region_id = env.default_logical_region_id();
254        let columns = &["odd", "even", "Ev_En"];
255        let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
256        engine
257            .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
258            .await
259            .unwrap();
260
261        // prepare data
262        let schema = test_util::row_schema_with_tags(columns);
263        let rows = test_util::build_rows(3, 100);
264        let request = RegionRequest::Put(RegionPutRequest {
265            rows: Rows { schema, rows },
266            hint: None,
267        });
268
269        // write data
270        let result = engine
271            .handle_request(logical_region_id, request)
272            .await
273            .unwrap();
274        assert_eq!(100, result.affected_rows);
275    }
276
277    #[tokio::test]
278    async fn test_write_physical_region() {
279        let env = TestEnv::new().await;
280        env.init_metric_region().await;
281        let engine = env.metric();
282
283        let physical_region_id = env.default_physical_region_id();
284        let schema = test_util::row_schema_with_tags(&["abc"]);
285        let rows = test_util::build_rows(1, 100);
286        let request = RegionRequest::Put(RegionPutRequest {
287            rows: Rows { schema, rows },
288            hint: None,
289        });
290
291        engine
292            .handle_request(physical_region_id, request)
293            .await
294            .unwrap_err();
295    }
296
297    #[tokio::test]
298    async fn test_write_nonexist_logical_region() {
299        let env = TestEnv::new().await;
300        env.init_metric_region().await;
301        let engine = env.metric();
302
303        let logical_region_id = RegionId::new(175, 8345);
304        let schema = test_util::row_schema_with_tags(&["def"]);
305        let rows = test_util::build_rows(1, 100);
306        let request = RegionRequest::Put(RegionPutRequest {
307            rows: Rows { schema, rows },
308            hint: None,
309        });
310
311        engine
312            .handle_request(logical_region_id, request)
313            .await
314            .unwrap_err();
315    }
316}