metric_engine/engine/
put.rs1use api::v1::{Rows, WriteHint};
16use common_telemetry::{error, info};
17use snafu::{ensure, OptionExt};
18use store_api::codec::PrimaryKeyEncoding;
19use store_api::region_request::{AffectedRows, RegionPutRequest};
20use store_api::storage::{RegionId, TableId};
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24 ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu,
25 PhysicalRegionNotFoundSnafu, Result,
26};
27use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
28use crate::row_modifier::RowsIter;
29use crate::utils::to_data_region_id;
30
31impl MetricEngineInner {
32 pub async fn put_region(
34 &self,
35 region_id: RegionId,
36 request: RegionPutRequest,
37 ) -> Result<AffectedRows> {
38 let is_putting_physical_region =
39 self.state.read().unwrap().exist_physical_region(region_id);
40
41 if is_putting_physical_region {
42 info!(
43 "Metric region received put request {request:?} on physical region {region_id:?}"
44 );
45 FORBIDDEN_OPERATION_COUNT.inc();
46
47 ForbiddenPhysicalAlterSnafu.fail()
48 } else {
49 self.put_logical_region(region_id, request).await
50 }
51 }
52
53 async fn put_logical_region(
54 &self,
55 logical_region_id: RegionId,
56 mut request: RegionPutRequest,
57 ) -> Result<AffectedRows> {
58 let _timer = MITO_OPERATION_ELAPSED
59 .with_label_values(&["put"])
60 .start_timer();
61
62 let (physical_region_id, data_region_id, primary_key_encoding) = {
63 let state = self.state.read().unwrap();
64 let physical_region_id = *state
65 .logical_regions()
66 .get(&logical_region_id)
67 .with_context(|| LogicalRegionNotFoundSnafu {
68 region_id: logical_region_id,
69 })?;
70 let data_region_id = to_data_region_id(physical_region_id);
71
72 let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
73 PhysicalRegionNotFoundSnafu {
74 region_id: data_region_id,
75 },
76 )?;
77
78 (physical_region_id, data_region_id, primary_key_encoding)
79 };
80
81 self.verify_put_request(logical_region_id, physical_region_id, &request)
82 .await?;
83
84 self.modify_rows(
88 physical_region_id,
89 logical_region_id.table_id(),
90 &mut request.rows,
91 primary_key_encoding,
92 )?;
93 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
94 request.hint = Some(WriteHint {
95 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
96 });
97 }
98 self.data_region.write_data(data_region_id, request).await
99 }
100
101 async fn verify_put_request(
107 &self,
108 logical_region_id: RegionId,
109 physical_region_id: RegionId,
110 request: &RegionPutRequest,
111 ) -> Result<()> {
112 let data_region_id = to_data_region_id(physical_region_id);
114 let state = self.state.read().unwrap();
115 if !state.is_logical_region_exist(logical_region_id) {
116 error!("Trying to write to an nonexistent region {logical_region_id}");
117 return LogicalRegionNotFoundSnafu {
118 region_id: logical_region_id,
119 }
120 .fail();
121 }
122
123 let physical_columns = state
125 .physical_region_states()
126 .get(&data_region_id)
127 .context(PhysicalRegionNotFoundSnafu {
128 region_id: data_region_id,
129 })?
130 .physical_columns();
131 for col in &request.rows.schema {
132 ensure!(
133 physical_columns.contains_key(&col.column_name),
134 ColumnNotFoundSnafu {
135 name: col.column_name.clone(),
136 region_id: logical_region_id,
137 }
138 );
139 }
140
141 Ok(())
142 }
143
144 fn modify_rows(
148 &self,
149 physical_region_id: RegionId,
150 table_id: TableId,
151 rows: &mut Rows,
152 encoding: PrimaryKeyEncoding,
153 ) -> Result<()> {
154 let input = std::mem::take(rows);
155 let iter = {
156 let state = self.state.read().unwrap();
157 let name_to_id = state
158 .physical_region_states()
159 .get(&physical_region_id)
160 .with_context(|| PhysicalRegionNotFoundSnafu {
161 region_id: physical_region_id,
162 })?
163 .physical_columns();
164 RowsIter::new(input, name_to_id)
165 };
166 let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
167 *rows = output;
168 Ok(())
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use common_recordbatch::RecordBatches;
175 use store_api::region_engine::RegionEngine;
176 use store_api::region_request::RegionRequest;
177 use store_api::storage::ScanRequest;
178
179 use super::*;
180 use crate::test_util::{self, TestEnv};
181
182 #[tokio::test]
183 async fn test_write_logical_region() {
184 let env = TestEnv::new().await;
185 env.init_metric_region().await;
186
187 let schema = test_util::row_schema_with_tags(&["job"]);
189 let rows = test_util::build_rows(1, 5);
190 let request = RegionRequest::Put(RegionPutRequest {
191 rows: Rows { schema, rows },
192 hint: None,
193 });
194
195 let logical_region_id = env.default_logical_region_id();
197 let result = env
198 .metric()
199 .handle_request(logical_region_id, request)
200 .await
201 .unwrap();
202 assert_eq!(result.affected_rows, 5);
203
204 let physical_region_id = env.default_physical_region_id();
206 let request = ScanRequest::default();
207 let stream = env
208 .metric()
209 .scan_to_stream(physical_region_id, request)
210 .await
211 .unwrap();
212 let batches = RecordBatches::try_collect(stream).await.unwrap();
213 let expected = "\
214+-------------------------+----------------+------------+----------------------+-------+
215| greptime_timestamp | greptime_value | __table_id | __tsid | job |
216+-------------------------+----------------+------------+----------------------+-------+
217| 1970-01-01T00:00:00 | 0.0 | 3 | 12881218023286672757 | tag_0 |
218| 1970-01-01T00:00:00.001 | 1.0 | 3 | 12881218023286672757 | tag_0 |
219| 1970-01-01T00:00:00.002 | 2.0 | 3 | 12881218023286672757 | tag_0 |
220| 1970-01-01T00:00:00.003 | 3.0 | 3 | 12881218023286672757 | tag_0 |
221| 1970-01-01T00:00:00.004 | 4.0 | 3 | 12881218023286672757 | tag_0 |
222+-------------------------+----------------+------------+----------------------+-------+";
223 assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
224
225 let request = ScanRequest::default();
227 let stream = env
228 .metric()
229 .scan_to_stream(logical_region_id, request)
230 .await
231 .unwrap();
232 let batches = RecordBatches::try_collect(stream).await.unwrap();
233 let expected = "\
234+-------------------------+----------------+-------+
235| greptime_timestamp | greptime_value | job |
236+-------------------------+----------------+-------+
237| 1970-01-01T00:00:00 | 0.0 | tag_0 |
238| 1970-01-01T00:00:00.001 | 1.0 | tag_0 |
239| 1970-01-01T00:00:00.002 | 2.0 | tag_0 |
240| 1970-01-01T00:00:00.003 | 3.0 | tag_0 |
241| 1970-01-01T00:00:00.004 | 4.0 | tag_0 |
242+-------------------------+----------------+-------+";
243 assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
244 }
245
246 #[tokio::test]
247 async fn test_write_logical_region_row_count() {
248 let env = TestEnv::new().await;
249 env.init_metric_region().await;
250 let engine = env.metric();
251
252 let logical_region_id = env.default_logical_region_id();
254 let columns = &["odd", "even", "Ev_En"];
255 let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
256 engine
257 .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
258 .await
259 .unwrap();
260
261 let schema = test_util::row_schema_with_tags(columns);
263 let rows = test_util::build_rows(3, 100);
264 let request = RegionRequest::Put(RegionPutRequest {
265 rows: Rows { schema, rows },
266 hint: None,
267 });
268
269 let result = engine
271 .handle_request(logical_region_id, request)
272 .await
273 .unwrap();
274 assert_eq!(100, result.affected_rows);
275 }
276
277 #[tokio::test]
278 async fn test_write_physical_region() {
279 let env = TestEnv::new().await;
280 env.init_metric_region().await;
281 let engine = env.metric();
282
283 let physical_region_id = env.default_physical_region_id();
284 let schema = test_util::row_schema_with_tags(&["abc"]);
285 let rows = test_util::build_rows(1, 100);
286 let request = RegionRequest::Put(RegionPutRequest {
287 rows: Rows { schema, rows },
288 hint: None,
289 });
290
291 engine
292 .handle_request(physical_region_id, request)
293 .await
294 .unwrap_err();
295 }
296
297 #[tokio::test]
298 async fn test_write_nonexist_logical_region() {
299 let env = TestEnv::new().await;
300 env.init_metric_region().await;
301 let engine = env.metric();
302
303 let logical_region_id = RegionId::new(175, 8345);
304 let schema = test_util::row_schema_with_tags(&["def"]);
305 let rows = test_util::build_rows(1, 100);
306 let request = RegionRequest::Put(RegionPutRequest {
307 rows: Rows { schema, rows },
308 hint: None,
309 });
310
311 engine
312 .handle_request(logical_region_id, request)
313 .await
314 .unwrap_err();
315 }
316}