1use api::v1::{Rows, WriteHint};
16use common_telemetry::{error, info};
17use snafu::{OptionExt, ensure};
18use store_api::codec::PrimaryKeyEncoding;
19use store_api::region_request::{
20 AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
21};
22use store_api::storage::{RegionId, TableId};
23
24use crate::engine::MetricEngineInner;
25use crate::error::{
26 ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu,
27 PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
28};
29use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
30use crate::row_modifier::RowsIter;
31use crate::utils::to_data_region_id;
32
33impl MetricEngineInner {
34 pub async fn put_region(
36 &self,
37 region_id: RegionId,
38 request: RegionPutRequest,
39 ) -> Result<AffectedRows> {
40 let is_putting_physical_region =
41 self.state.read().unwrap().exist_physical_region(region_id);
42
43 if is_putting_physical_region {
44 info!(
45 "Metric region received put request {request:?} on physical region {region_id:?}"
46 );
47 FORBIDDEN_OPERATION_COUNT.inc();
48
49 ForbiddenPhysicalAlterSnafu.fail()
50 } else {
51 self.put_logical_region(region_id, request).await
52 }
53 }
54
55 pub async fn delete_region(
57 &self,
58 region_id: RegionId,
59 request: RegionDeleteRequest,
60 ) -> Result<AffectedRows> {
61 if self.is_physical_region(region_id) {
62 info!(
63 "Metric region received delete request {request:?} on physical region {region_id:?}"
64 );
65 FORBIDDEN_OPERATION_COUNT.inc();
66
67 UnsupportedRegionRequestSnafu {
68 request: RegionRequest::Delete(request),
69 }
70 .fail()
71 } else {
72 self.delete_logical_region(region_id, request).await
73 }
74 }
75
76 async fn put_logical_region(
77 &self,
78 logical_region_id: RegionId,
79 mut request: RegionPutRequest,
80 ) -> Result<AffectedRows> {
81 let _timer = MITO_OPERATION_ELAPSED
82 .with_label_values(&["put"])
83 .start_timer();
84
85 let (physical_region_id, data_region_id, primary_key_encoding) =
86 self.find_data_region_meta(logical_region_id)?;
87
88 self.verify_rows(logical_region_id, physical_region_id, &request.rows)
89 .await?;
90
91 self.modify_rows(
94 physical_region_id,
95 logical_region_id.table_id(),
96 &mut request.rows,
97 primary_key_encoding,
98 )?;
99 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
100 request.hint = Some(WriteHint {
101 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
102 });
103 }
104 self.data_region
105 .write_data(data_region_id, RegionRequest::Put(request))
106 .await
107 }
108
109 async fn delete_logical_region(
110 &self,
111 logical_region_id: RegionId,
112 mut request: RegionDeleteRequest,
113 ) -> Result<AffectedRows> {
114 let _timer = MITO_OPERATION_ELAPSED
115 .with_label_values(&["delete"])
116 .start_timer();
117
118 let (physical_region_id, data_region_id, primary_key_encoding) =
119 self.find_data_region_meta(logical_region_id)?;
120
121 self.verify_rows(logical_region_id, physical_region_id, &request.rows)
122 .await?;
123
124 self.modify_rows(
127 physical_region_id,
128 logical_region_id.table_id(),
129 &mut request.rows,
130 primary_key_encoding,
131 )?;
132 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
133 request.hint = Some(WriteHint {
134 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
135 });
136 }
137 self.data_region
138 .write_data(data_region_id, RegionRequest::Delete(request))
139 .await
140 }
141
142 fn find_data_region_meta(
143 &self,
144 logical_region_id: RegionId,
145 ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
146 let state = self.state.read().unwrap();
147 let physical_region_id = *state
148 .logical_regions()
149 .get(&logical_region_id)
150 .with_context(|| LogicalRegionNotFoundSnafu {
151 region_id: logical_region_id,
152 })?;
153 let data_region_id = to_data_region_id(physical_region_id);
154 let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
155 PhysicalRegionNotFoundSnafu {
156 region_id: data_region_id,
157 },
158 )?;
159 Ok((physical_region_id, data_region_id, primary_key_encoding))
160 }
161
162 async fn verify_rows(
168 &self,
169 logical_region_id: RegionId,
170 physical_region_id: RegionId,
171 rows: &Rows,
172 ) -> Result<()> {
173 let data_region_id = to_data_region_id(physical_region_id);
175 let state = self.state.read().unwrap();
176 if !state.is_logical_region_exist(logical_region_id) {
177 error!("Trying to write to an nonexistent region {logical_region_id}");
178 return LogicalRegionNotFoundSnafu {
179 region_id: logical_region_id,
180 }
181 .fail();
182 }
183
184 let physical_columns = state
186 .physical_region_states()
187 .get(&data_region_id)
188 .context(PhysicalRegionNotFoundSnafu {
189 region_id: data_region_id,
190 })?
191 .physical_columns();
192 for col in &rows.schema {
193 ensure!(
194 physical_columns.contains_key(&col.column_name),
195 ColumnNotFoundSnafu {
196 name: col.column_name.clone(),
197 region_id: logical_region_id,
198 }
199 );
200 }
201
202 Ok(())
203 }
204
205 fn modify_rows(
209 &self,
210 physical_region_id: RegionId,
211 table_id: TableId,
212 rows: &mut Rows,
213 encoding: PrimaryKeyEncoding,
214 ) -> Result<()> {
215 let input = std::mem::take(rows);
216 let iter = {
217 let state = self.state.read().unwrap();
218 let name_to_id = state
219 .physical_region_states()
220 .get(&physical_region_id)
221 .with_context(|| PhysicalRegionNotFoundSnafu {
222 region_id: physical_region_id,
223 })?
224 .physical_columns();
225 RowsIter::new(input, name_to_id)
226 };
227 let output = self.row_modifier.modify_rows(iter, table_id, encoding)?;
228 *rows = output;
229 Ok(())
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use common_recordbatch::RecordBatches;
236 use store_api::region_engine::RegionEngine;
237 use store_api::region_request::RegionRequest;
238 use store_api::storage::ScanRequest;
239
240 use super::*;
241 use crate::test_util::{self, TestEnv};
242
243 #[tokio::test]
244 async fn test_write_logical_region() {
245 let env = TestEnv::new().await;
246 env.init_metric_region().await;
247
248 let schema = test_util::row_schema_with_tags(&["job"]);
250 let rows = test_util::build_rows(1, 5);
251 let request = RegionRequest::Put(RegionPutRequest {
252 rows: Rows { schema, rows },
253 hint: None,
254 });
255
256 let logical_region_id = env.default_logical_region_id();
258 let result = env
259 .metric()
260 .handle_request(logical_region_id, request)
261 .await
262 .unwrap();
263 assert_eq!(result.affected_rows, 5);
264
265 let physical_region_id = env.default_physical_region_id();
267 let request = ScanRequest::default();
268 let stream = env
269 .metric()
270 .scan_to_stream(physical_region_id, request)
271 .await
272 .unwrap();
273 let batches = RecordBatches::try_collect(stream).await.unwrap();
274 let expected = "\
275+-------------------------+----------------+------------+---------------------+-------+
276| greptime_timestamp | greptime_value | __table_id | __tsid | job |
277+-------------------------+----------------+------------+---------------------+-------+
278| 1970-01-01T00:00:00 | 0.0 | 3 | 2955007454552897459 | tag_0 |
279| 1970-01-01T00:00:00.001 | 1.0 | 3 | 2955007454552897459 | tag_0 |
280| 1970-01-01T00:00:00.002 | 2.0 | 3 | 2955007454552897459 | tag_0 |
281| 1970-01-01T00:00:00.003 | 3.0 | 3 | 2955007454552897459 | tag_0 |
282| 1970-01-01T00:00:00.004 | 4.0 | 3 | 2955007454552897459 | tag_0 |
283+-------------------------+----------------+------------+---------------------+-------+";
284 assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
285
286 let request = ScanRequest::default();
288 let stream = env
289 .metric()
290 .scan_to_stream(logical_region_id, request)
291 .await
292 .unwrap();
293 let batches = RecordBatches::try_collect(stream).await.unwrap();
294 let expected = "\
295+-------------------------+----------------+-------+
296| greptime_timestamp | greptime_value | job |
297+-------------------------+----------------+-------+
298| 1970-01-01T00:00:00 | 0.0 | tag_0 |
299| 1970-01-01T00:00:00.001 | 1.0 | tag_0 |
300| 1970-01-01T00:00:00.002 | 2.0 | tag_0 |
301| 1970-01-01T00:00:00.003 | 3.0 | tag_0 |
302| 1970-01-01T00:00:00.004 | 4.0 | tag_0 |
303+-------------------------+----------------+-------+";
304 assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
305 }
306
307 #[tokio::test]
308 async fn test_write_logical_region_row_count() {
309 let env = TestEnv::new().await;
310 env.init_metric_region().await;
311 let engine = env.metric();
312
313 let logical_region_id = env.default_logical_region_id();
315 let columns = &["odd", "even", "Ev_En"];
316 let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
317 engine
318 .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
319 .await
320 .unwrap();
321
322 let schema = test_util::row_schema_with_tags(columns);
324 let rows = test_util::build_rows(3, 100);
325 let request = RegionRequest::Put(RegionPutRequest {
326 rows: Rows { schema, rows },
327 hint: None,
328 });
329
330 let result = engine
332 .handle_request(logical_region_id, request)
333 .await
334 .unwrap();
335 assert_eq!(100, result.affected_rows);
336 }
337
338 #[tokio::test]
339 async fn test_write_physical_region() {
340 let env = TestEnv::new().await;
341 env.init_metric_region().await;
342 let engine = env.metric();
343
344 let physical_region_id = env.default_physical_region_id();
345 let schema = test_util::row_schema_with_tags(&["abc"]);
346 let rows = test_util::build_rows(1, 100);
347 let request = RegionRequest::Put(RegionPutRequest {
348 rows: Rows { schema, rows },
349 hint: None,
350 });
351
352 engine
353 .handle_request(physical_region_id, request)
354 .await
355 .unwrap_err();
356 }
357
358 #[tokio::test]
359 async fn test_write_nonexist_logical_region() {
360 let env = TestEnv::new().await;
361 env.init_metric_region().await;
362 let engine = env.metric();
363
364 let logical_region_id = RegionId::new(175, 8345);
365 let schema = test_util::row_schema_with_tags(&["def"]);
366 let rows = test_util::build_rows(1, 100);
367 let request = RegionRequest::Put(RegionPutRequest {
368 rows: Rows { schema, rows },
369 hint: None,
370 });
371
372 engine
373 .handle_request(logical_region_id, request)
374 .await
375 .unwrap_err();
376 }
377}