metric_engine/engine/
put.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::{Rows, WriteHint};
16use common_telemetry::{error, info};
17use snafu::{OptionExt, ensure};
18use store_api::codec::PrimaryKeyEncoding;
19use store_api::region_request::{
20    AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
21};
22use store_api::storage::{RegionId, TableId};
23
24use crate::engine::MetricEngineInner;
25use crate::error::{
26    ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu,
27    PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
28};
29use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
30use crate::row_modifier::RowsIter;
31use crate::utils::to_data_region_id;
32
33impl MetricEngineInner {
34    /// Dispatch region put request
35    pub async fn put_region(
36        &self,
37        region_id: RegionId,
38        request: RegionPutRequest,
39    ) -> Result<AffectedRows> {
40        let is_putting_physical_region =
41            self.state.read().unwrap().exist_physical_region(region_id);
42
43        if is_putting_physical_region {
44            info!(
45                "Metric region received put request {request:?} on physical region {region_id:?}"
46            );
47            FORBIDDEN_OPERATION_COUNT.inc();
48
49            ForbiddenPhysicalAlterSnafu.fail()
50        } else {
51            self.put_logical_region(region_id, request).await
52        }
53    }
54
55    /// Dispatch region delete request
56    pub async fn delete_region(
57        &self,
58        region_id: RegionId,
59        request: RegionDeleteRequest,
60    ) -> Result<AffectedRows> {
61        if self.is_physical_region(region_id) {
62            info!(
63                "Metric region received delete request {request:?} on physical region {region_id:?}"
64            );
65            FORBIDDEN_OPERATION_COUNT.inc();
66
67            UnsupportedRegionRequestSnafu {
68                request: RegionRequest::Delete(request),
69            }
70            .fail()
71        } else {
72            self.delete_logical_region(region_id, request).await
73        }
74    }
75
76    async fn put_logical_region(
77        &self,
78        logical_region_id: RegionId,
79        mut request: RegionPutRequest,
80    ) -> Result<AffectedRows> {
81        let _timer = MITO_OPERATION_ELAPSED
82            .with_label_values(&["put"])
83            .start_timer();
84
85        let (physical_region_id, data_region_id, primary_key_encoding) =
86            self.find_data_region_meta(logical_region_id)?;
87
88        self.verify_rows(logical_region_id, physical_region_id, &request.rows)
89            .await?;
90
91        // write to data region
92        // TODO: retrieve table name
93        self.modify_rows(
94            physical_region_id,
95            logical_region_id.table_id(),
96            &mut request.rows,
97            primary_key_encoding,
98        )?;
99        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
100            request.hint = Some(WriteHint {
101                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
102            });
103        }
104        self.data_region
105            .write_data(data_region_id, RegionRequest::Put(request))
106            .await
107    }
108
109    async fn delete_logical_region(
110        &self,
111        logical_region_id: RegionId,
112        mut request: RegionDeleteRequest,
113    ) -> Result<AffectedRows> {
114        let _timer = MITO_OPERATION_ELAPSED
115            .with_label_values(&["delete"])
116            .start_timer();
117
118        let (physical_region_id, data_region_id, primary_key_encoding) =
119            self.find_data_region_meta(logical_region_id)?;
120
121        self.verify_rows(logical_region_id, physical_region_id, &request.rows)
122            .await?;
123
124        // write to data region
125        // TODO: retrieve table name
126        self.modify_rows(
127            physical_region_id,
128            logical_region_id.table_id(),
129            &mut request.rows,
130            primary_key_encoding,
131        )?;
132        if primary_key_encoding == PrimaryKeyEncoding::Sparse {
133            request.hint = Some(WriteHint {
134                primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
135            });
136        }
137        self.data_region
138            .write_data(data_region_id, RegionRequest::Delete(request))
139            .await
140    }
141
142    fn find_data_region_meta(
143        &self,
144        logical_region_id: RegionId,
145    ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
146        let state = self.state.read().unwrap();
147        let physical_region_id = *state
148            .logical_regions()
149            .get(&logical_region_id)
150            .with_context(|| LogicalRegionNotFoundSnafu {
151                region_id: logical_region_id,
152            })?;
153        let data_region_id = to_data_region_id(physical_region_id);
154        let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
155            PhysicalRegionNotFoundSnafu {
156                region_id: data_region_id,
157            },
158        )?;
159        Ok((physical_region_id, data_region_id, primary_key_encoding))
160    }
161
162    /// Verifies a request for a logical region against its corresponding metadata region.
163    ///
164    /// Includes:
165    /// - Check if the logical region exists
166    /// - Check if the columns exist
167    async fn verify_rows(
168        &self,
169        logical_region_id: RegionId,
170        physical_region_id: RegionId,
171        rows: &Rows,
172    ) -> Result<()> {
173        // Check if the region exists
174        let data_region_id = to_data_region_id(physical_region_id);
175        let state = self.state.read().unwrap();
176        if !state.is_logical_region_exist(logical_region_id) {
177            error!("Trying to write to an nonexistent region {logical_region_id}");
178            return LogicalRegionNotFoundSnafu {
179                region_id: logical_region_id,
180            }
181            .fail();
182        }
183
184        // Check if a physical column exists
185        let physical_columns = state
186            .physical_region_states()
187            .get(&data_region_id)
188            .context(PhysicalRegionNotFoundSnafu {
189                region_id: data_region_id,
190            })?
191            .physical_columns();
192        for col in &rows.schema {
193            ensure!(
194                physical_columns.contains_key(&col.column_name),
195                ColumnNotFoundSnafu {
196                    name: col.column_name.clone(),
197                    region_id: logical_region_id,
198                }
199            );
200        }
201
202        Ok(())
203    }
204
205    /// Perform metric engine specific logic to incoming rows.
206    /// - Add table_id column
207    /// - Generate tsid
208    fn modify_rows(
209        &self,
210        physical_region_id: RegionId,
211        table_id: TableId,
212        rows: &mut Rows,
213        encoding: PrimaryKeyEncoding,
214    ) -> Result<()> {
215        let input = std::mem::take(rows);
216        let iter = {
217            let state = self.state.read().unwrap();
218            let name_to_id = state
219                .physical_region_states()
220                .get(&physical_region_id)
221                .with_context(|| PhysicalRegionNotFoundSnafu {
222                    region_id: physical_region_id,
223                })?
224                .physical_columns();
225            RowsIter::new(input, name_to_id)
226        };
227        let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
228        *rows = output;
229        Ok(())
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use common_recordbatch::RecordBatches;
236    use store_api::region_engine::RegionEngine;
237    use store_api::region_request::RegionRequest;
238    use store_api::storage::ScanRequest;
239
240    use super::*;
241    use crate::test_util::{self, TestEnv};
242
243    #[tokio::test]
244    async fn test_write_logical_region() {
245        let env = TestEnv::new().await;
246        env.init_metric_region().await;
247
248        // prepare data
249        let schema = test_util::row_schema_with_tags(&["job"]);
250        let rows = test_util::build_rows(1, 5);
251        let request = RegionRequest::Put(RegionPutRequest {
252            rows: Rows { schema, rows },
253            hint: None,
254        });
255
256        // write data
257        let logical_region_id = env.default_logical_region_id();
258        let result = env
259            .metric()
260            .handle_request(logical_region_id, request)
261            .await
262            .unwrap();
263        assert_eq!(result.affected_rows, 5);
264
265        // read data from physical region
266        let physical_region_id = env.default_physical_region_id();
267        let request = ScanRequest::default();
268        let stream = env
269            .metric()
270            .scan_to_stream(physical_region_id, request)
271            .await
272            .unwrap();
273        let batches = RecordBatches::try_collect(stream).await.unwrap();
274        let expected = "\
275+-------------------------+----------------+------------+---------------------+-------+
276| greptime_timestamp      | greptime_value | __table_id | __tsid              | job   |
277+-------------------------+----------------+------------+---------------------+-------+
278| 1970-01-01T00:00:00     | 0.0            | 3          | 2955007454552897459 | tag_0 |
279| 1970-01-01T00:00:00.001 | 1.0            | 3          | 2955007454552897459 | tag_0 |
280| 1970-01-01T00:00:00.002 | 2.0            | 3          | 2955007454552897459 | tag_0 |
281| 1970-01-01T00:00:00.003 | 3.0            | 3          | 2955007454552897459 | tag_0 |
282| 1970-01-01T00:00:00.004 | 4.0            | 3          | 2955007454552897459 | tag_0 |
283+-------------------------+----------------+------------+---------------------+-------+";
284        assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
285
286        // read data from logical region
287        let request = ScanRequest::default();
288        let stream = env
289            .metric()
290            .scan_to_stream(logical_region_id, request)
291            .await
292            .unwrap();
293        let batches = RecordBatches::try_collect(stream).await.unwrap();
294        let expected = "\
295+-------------------------+----------------+-------+
296| greptime_timestamp      | greptime_value | job   |
297+-------------------------+----------------+-------+
298| 1970-01-01T00:00:00     | 0.0            | tag_0 |
299| 1970-01-01T00:00:00.001 | 1.0            | tag_0 |
300| 1970-01-01T00:00:00.002 | 2.0            | tag_0 |
301| 1970-01-01T00:00:00.003 | 3.0            | tag_0 |
302| 1970-01-01T00:00:00.004 | 4.0            | tag_0 |
303+-------------------------+----------------+-------+";
304        assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
305    }
306
307    #[tokio::test]
308    async fn test_write_logical_region_row_count() {
309        let env = TestEnv::new().await;
310        env.init_metric_region().await;
311        let engine = env.metric();
312
313        // add columns
314        let logical_region_id = env.default_logical_region_id();
315        let columns = &["odd", "even", "Ev_En"];
316        let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
317        engine
318            .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
319            .await
320            .unwrap();
321
322        // prepare data
323        let schema = test_util::row_schema_with_tags(columns);
324        let rows = test_util::build_rows(3, 100);
325        let request = RegionRequest::Put(RegionPutRequest {
326            rows: Rows { schema, rows },
327            hint: None,
328        });
329
330        // write data
331        let result = engine
332            .handle_request(logical_region_id, request)
333            .await
334            .unwrap();
335        assert_eq!(100, result.affected_rows);
336    }
337
338    #[tokio::test]
339    async fn test_write_physical_region() {
340        let env = TestEnv::new().await;
341        env.init_metric_region().await;
342        let engine = env.metric();
343
344        let physical_region_id = env.default_physical_region_id();
345        let schema = test_util::row_schema_with_tags(&["abc"]);
346        let rows = test_util::build_rows(1, 100);
347        let request = RegionRequest::Put(RegionPutRequest {
348            rows: Rows { schema, rows },
349            hint: None,
350        });
351
352        engine
353            .handle_request(physical_region_id, request)
354            .await
355            .unwrap_err();
356    }
357
358    #[tokio::test]
359    async fn test_write_nonexist_logical_region() {
360        let env = TestEnv::new().await;
361        env.init_metric_region().await;
362        let engine = env.metric();
363
364        let logical_region_id = RegionId::new(175, 8345);
365        let schema = test_util::row_schema_with_tags(&["def"]);
366        let rows = test_util::build_rows(1, 100);
367        let request = RegionRequest::Put(RegionPutRequest {
368            rows: Rows { schema, rows },
369            hint: None,
370        });
371
372        engine
373            .handle_request(logical_region_id, request)
374            .await
375            .unwrap_err();
376    }
377}