1use std::collections::HashMap;
16
17use api::v1::{
18 ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint,
19};
20use common_telemetry::{error, info};
21use fxhash::FxHashMap;
22use snafu::{OptionExt, ensure};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::region_request::{
25 AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
26};
27use store_api::storage::{RegionId, TableId};
28
29use crate::engine::MetricEngineInner;
30use crate::error::{
31 ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
32 LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
33};
34use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
35use crate::row_modifier::{RowsIter, TableIdInput};
36use crate::utils::to_data_region_id;
37
38impl MetricEngineInner {
39 pub async fn put_region(
41 &self,
42 region_id: RegionId,
43 request: RegionPutRequest,
44 ) -> Result<AffectedRows> {
45 let is_putting_physical_region =
46 self.state.read().unwrap().exist_physical_region(region_id);
47
48 if is_putting_physical_region {
49 info!(
50 "Metric region received put request {request:?} on physical region {region_id:?}"
51 );
52 FORBIDDEN_OPERATION_COUNT.inc();
53
54 ForbiddenPhysicalAlterSnafu.fail()
55 } else {
56 self.put_logical_region(region_id, request).await
57 }
58 }
59
60 pub async fn put_regions_batch(
70 &self,
71 requests: impl ExactSizeIterator<Item = (RegionId, RegionPutRequest)>,
72 ) -> Result<AffectedRows> {
73 let len = requests.len();
74
75 if len == 0 {
76 return Ok(0);
77 }
78
79 let _timer = MITO_OPERATION_ELAPSED
80 .with_label_values(&["put_batch"])
81 .start_timer();
82
83 if len == 1 {
85 let (logical_id, req) = requests.into_iter().next().unwrap();
86 return self.put_logical_region(logical_id, req).await;
87 }
88
89 let mut requests_per_physical: HashMap<RegionId, Vec<(RegionId, RegionPutRequest)>> =
90 HashMap::new();
91 for (logical_region_id, request) in requests {
92 let physical_region_id = self.find_physical_region_id(logical_region_id)?;
93 requests_per_physical
94 .entry(physical_region_id)
95 .or_default()
96 .push((logical_region_id, request));
97 }
98
99 let mut total_affected_rows: AffectedRows = 0;
100 for (physical_region_id, requests) in requests_per_physical {
101 let affected_rows = self
102 .put_regions_batch_single_physical(physical_region_id, requests)
103 .await?;
104 total_affected_rows += affected_rows;
105 }
106
107 Ok(total_affected_rows)
108 }
109
110 async fn put_regions_batch_single_physical(
117 &self,
118 physical_region_id: RegionId,
119 requests: Vec<(RegionId, RegionPutRequest)>,
120 ) -> Result<AffectedRows> {
121 if requests.is_empty() {
122 return Ok(0);
123 }
124
125 let data_region_id = to_data_region_id(physical_region_id);
126 let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?;
127
128 self.validate_batch_requests(physical_region_id, &requests)
130 .await?;
131
132 let (merged_request, total_affected_rows) = match primary_key_encoding {
134 PrimaryKeyEncoding::Sparse => self.merge_sparse_batch(physical_region_id, requests)?,
135 PrimaryKeyEncoding::Dense => self.merge_dense_batch(data_region_id, requests)?,
136 };
137
138 self.data_region
140 .write_data(data_region_id, RegionRequest::Put(merged_request))
141 .await?;
142
143 Ok(total_affected_rows)
144 }
145
146 fn get_primary_key_encoding(&self, data_region_id: RegionId) -> Result<PrimaryKeyEncoding> {
148 let state = self.state.read().unwrap();
149 state
150 .get_primary_key_encoding(data_region_id)
151 .context(PhysicalRegionNotFoundSnafu {
152 region_id: data_region_id,
153 })
154 }
155
156 async fn validate_batch_requests(
158 &self,
159 physical_region_id: RegionId,
160 requests: &[(RegionId, RegionPutRequest)],
161 ) -> Result<()> {
162 for (logical_region_id, request) in requests {
163 self.verify_rows(*logical_region_id, physical_region_id, &request.rows)
164 .await?;
165 }
166 Ok(())
167 }
168
169 fn merge_sparse_batch(
171 &self,
172 physical_region_id: RegionId,
173 requests: Vec<(RegionId, RegionPutRequest)>,
174 ) -> Result<(RegionPutRequest, AffectedRows)> {
175 let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
176 let mut merged_rows = Vec::with_capacity(total_rows);
177 let mut total_affected_rows: AffectedRows = 0;
178 let mut output_schema: Option<Vec<ColumnSchema>> = None;
179 let mut merged_version: Option<u64> = None;
180
181 for (logical_region_id, mut request) in requests {
183 if let Some(request_version) = request.partition_rule_version {
184 if let Some(merged_version) = merged_version {
185 ensure!(
186 merged_version == request_version,
187 InvalidRequestSnafu {
188 region_id: physical_region_id,
189 reason: "inconsistent partition rule version in batch"
190 }
191 );
192 } else {
193 merged_version = Some(request_version);
194 }
195 }
196 self.modify_rows(
197 physical_region_id,
198 logical_region_id.table_id(),
199 &mut request.rows,
200 PrimaryKeyEncoding::Sparse,
201 )?;
202
203 let row_count = request.rows.rows.len();
204 total_affected_rows += row_count as AffectedRows;
205
206 if output_schema.is_none() {
208 output_schema = Some(request.rows.schema.clone());
209 }
210
211 merged_rows.extend(request.rows.rows);
212 }
213
214 let schema = output_schema.unwrap();
216
217 let merged_request = RegionPutRequest {
218 rows: Rows {
219 schema,
220 rows: merged_rows,
221 },
222 hint: Some(WriteHint {
223 primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
224 }),
225 partition_rule_version: merged_version,
226 };
227
228 Ok((merged_request, total_affected_rows))
229 }
230
231 fn merge_dense_batch(
237 &self,
238 data_region_id: RegionId,
239 requests: Vec<(RegionId, RegionPutRequest)>,
240 ) -> Result<(RegionPutRequest, AffectedRows)> {
241 let merged_schema = Self::build_union_schema(&requests);
243
244 let (merged_rows, table_ids, merged_version) =
246 Self::align_requests_to_schema(requests, &merged_schema)?;
247
248 let final_rows = {
250 let state = self.state.read().unwrap();
251 let name_to_id = state
252 .physical_region_states()
253 .get(&data_region_id)
254 .with_context(|| PhysicalRegionNotFoundSnafu {
255 region_id: data_region_id,
256 })?
257 .physical_columns();
258
259 let iter = RowsIter::new(
260 Rows {
261 schema: merged_schema,
262 rows: merged_rows,
263 },
264 name_to_id,
265 );
266
267 self.row_modifier.modify_rows(
268 iter,
269 TableIdInput::Batch(&table_ids),
270 PrimaryKeyEncoding::Dense,
271 )?
272 };
273
274 let merged_request = RegionPutRequest {
275 rows: final_rows,
276 hint: None,
277 partition_rule_version: merged_version,
278 };
279
280 Ok((merged_request, table_ids.len() as AffectedRows))
281 }
282
283 fn build_union_schema(requests: &[(RegionId, RegionPutRequest)]) -> Vec<ColumnSchema> {
285 let mut schema_map: HashMap<&str, ColumnSchema> = HashMap::new();
286 for (_, request) in requests {
287 for col in &request.rows.schema {
288 schema_map
289 .entry(col.column_name.as_str())
290 .or_insert_with(|| col.clone());
291 }
292 }
293 schema_map.into_values().collect()
294 }
295
296 fn align_requests_to_schema(
297 requests: Vec<(RegionId, RegionPutRequest)>,
298 merged_schema: &[ColumnSchema],
299 ) -> Result<(Vec<Row>, Vec<TableId>, Option<u64>)> {
300 let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
302 let mut merged_rows = Vec::with_capacity(total_rows);
303 let mut table_ids = Vec::with_capacity(total_rows);
304 let mut merged_version: Option<u64> = None;
305
306 let null_value = Value { value_data: None };
307
308 for (logical_region_id, request) in requests {
309 if let Some(request_version) = request.partition_rule_version {
310 if let Some(merged_version) = merged_version {
311 ensure!(
312 merged_version == request_version,
313 InvalidRequestSnafu {
314 region_id: logical_region_id,
315 reason: "inconsistent partition rule version in batch"
316 }
317 );
318 } else {
319 merged_version = Some(request_version);
320 }
321 }
322 let table_id = logical_region_id.table_id();
323
324 let col_name_to_idx: FxHashMap<&str, usize> = request
326 .rows
327 .schema
328 .iter()
329 .enumerate()
330 .map(|(idx, col)| (col.column_name.as_str(), idx))
331 .collect();
332
333 let col_mapping: Vec<Option<usize>> = merged_schema
337 .iter()
338 .map(|merged_col| {
339 col_name_to_idx
340 .get(merged_col.column_name.as_str())
341 .copied()
342 })
343 .collect();
344
345 for mut row in request.rows.rows {
347 let mut aligned_values = Vec::with_capacity(merged_schema.len());
348 for &opt_idx in &col_mapping {
349 aligned_values.push(match opt_idx {
350 Some(idx) => std::mem::take(&mut row.values[idx]),
351 None => null_value.clone(),
352 });
353 }
354 merged_rows.push(Row {
355 values: aligned_values,
356 });
357 table_ids.push(table_id);
358 }
359 }
360
361 Ok((merged_rows, table_ids, merged_version))
362 }
363
364 fn find_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
366 let state = self.state.read().unwrap();
367 state
368 .logical_regions()
369 .get(&logical_region_id)
370 .copied()
371 .context(LogicalRegionNotFoundSnafu {
372 region_id: logical_region_id,
373 })
374 }
375
376 pub async fn delete_region(
378 &self,
379 region_id: RegionId,
380 request: RegionDeleteRequest,
381 ) -> Result<AffectedRows> {
382 if self.is_physical_region(region_id) {
383 info!(
384 "Metric region received delete request {request:?} on physical region {region_id:?}"
385 );
386 FORBIDDEN_OPERATION_COUNT.inc();
387
388 UnsupportedRegionRequestSnafu {
389 request: RegionRequest::Delete(request),
390 }
391 .fail()
392 } else {
393 self.delete_logical_region(region_id, request).await
394 }
395 }
396
397 async fn put_logical_region(
398 &self,
399 logical_region_id: RegionId,
400 mut request: RegionPutRequest,
401 ) -> Result<AffectedRows> {
402 let _timer = MITO_OPERATION_ELAPSED
403 .with_label_values(&["put"])
404 .start_timer();
405
406 let (physical_region_id, data_region_id, primary_key_encoding) =
407 self.find_data_region_meta(logical_region_id)?;
408
409 self.verify_rows(logical_region_id, physical_region_id, &request.rows)
410 .await?;
411
412 self.modify_rows(
415 physical_region_id,
416 logical_region_id.table_id(),
417 &mut request.rows,
418 primary_key_encoding,
419 )?;
420 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
421 request.hint = Some(WriteHint {
422 primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
423 });
424 }
425 self.data_region
426 .write_data(data_region_id, RegionRequest::Put(request))
427 .await
428 }
429
430 async fn delete_logical_region(
431 &self,
432 logical_region_id: RegionId,
433 mut request: RegionDeleteRequest,
434 ) -> Result<AffectedRows> {
435 let _timer = MITO_OPERATION_ELAPSED
436 .with_label_values(&["delete"])
437 .start_timer();
438
439 let (physical_region_id, data_region_id, primary_key_encoding) =
440 self.find_data_region_meta(logical_region_id)?;
441
442 self.verify_rows(logical_region_id, physical_region_id, &request.rows)
443 .await?;
444
445 self.modify_rows(
448 physical_region_id,
449 logical_region_id.table_id(),
450 &mut request.rows,
451 primary_key_encoding,
452 )?;
453 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
454 request.hint = Some(WriteHint {
455 primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
456 });
457 }
458 self.data_region
459 .write_data(data_region_id, RegionRequest::Delete(request))
460 .await
461 }
462
463 fn find_data_region_meta(
464 &self,
465 logical_region_id: RegionId,
466 ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
467 let state = self.state.read().unwrap();
468 let physical_region_id = *state
469 .logical_regions()
470 .get(&logical_region_id)
471 .with_context(|| LogicalRegionNotFoundSnafu {
472 region_id: logical_region_id,
473 })?;
474 let data_region_id = to_data_region_id(physical_region_id);
475 let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
476 PhysicalRegionNotFoundSnafu {
477 region_id: data_region_id,
478 },
479 )?;
480 Ok((physical_region_id, data_region_id, primary_key_encoding))
481 }
482
483 async fn verify_rows(
489 &self,
490 logical_region_id: RegionId,
491 physical_region_id: RegionId,
492 rows: &Rows,
493 ) -> Result<()> {
494 let data_region_id = to_data_region_id(physical_region_id);
496 let state = self.state.read().unwrap();
497 if !state.is_logical_region_exist(logical_region_id) {
498 error!("Trying to write to an nonexistent region {logical_region_id}");
499 return LogicalRegionNotFoundSnafu {
500 region_id: logical_region_id,
501 }
502 .fail();
503 }
504
505 let physical_columns = state
507 .physical_region_states()
508 .get(&data_region_id)
509 .context(PhysicalRegionNotFoundSnafu {
510 region_id: data_region_id,
511 })?
512 .physical_columns();
513 for col in &rows.schema {
514 ensure!(
515 physical_columns.contains_key(&col.column_name),
516 ColumnNotFoundSnafu {
517 name: col.column_name.clone(),
518 region_id: logical_region_id,
519 }
520 );
521 }
522
523 Ok(())
524 }
525
526 fn modify_rows(
530 &self,
531 physical_region_id: RegionId,
532 table_id: TableId,
533 rows: &mut Rows,
534 encoding: PrimaryKeyEncoding,
535 ) -> Result<()> {
536 let input = std::mem::take(rows);
537 let iter = {
538 let state = self.state.read().unwrap();
539 let name_to_id = state
540 .physical_region_states()
541 .get(&physical_region_id)
542 .with_context(|| PhysicalRegionNotFoundSnafu {
543 region_id: physical_region_id,
544 })?
545 .physical_columns();
546 RowsIter::new(input, name_to_id)
547 };
548 let output =
549 self.row_modifier
550 .modify_rows(iter, TableIdInput::Single(table_id), encoding)?;
551 *rows = output;
552 Ok(())
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use std::collections::HashSet;
559
560 use common_error::ext::ErrorExt;
561 use common_error::status_code::StatusCode;
562 use common_function::utils::partition_rule_version;
563 use common_recordbatch::RecordBatches;
564 use datatypes::value::Value as PartitionValue;
565 use partition::expr::col;
566 use store_api::metric_engine_consts::{
567 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
568 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
569 };
570 use store_api::path_utils::table_dir;
571 use store_api::region_engine::RegionEngine;
572 use store_api::region_request::{EnterStagingRequest, RegionRequest};
573 use store_api::storage::ScanRequest;
574 use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
575
576 use super::*;
577 use crate::test_util::{self, TestEnv};
578
579 fn assert_merged_schema(rows: &Rows, expect_sparse: bool) {
580 let column_names: HashSet<String> = rows
581 .schema
582 .iter()
583 .map(|col| col.column_name.clone())
584 .collect();
585
586 if expect_sparse {
587 assert!(
588 column_names.contains(PRIMARY_KEY_COLUMN_NAME),
589 "sparse encoding should include primary key column"
590 );
591 assert!(
592 !column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
593 "sparse encoding should not include table id column"
594 );
595 assert!(
596 !column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
597 "sparse encoding should not include tsid column"
598 );
599 assert!(
600 !column_names.contains("job"),
601 "sparse encoding should not include tag columns"
602 );
603 assert!(
604 !column_names.contains("instance"),
605 "sparse encoding should not include tag columns"
606 );
607 } else {
608 assert!(
609 !column_names.contains(PRIMARY_KEY_COLUMN_NAME),
610 "dense encoding should not include primary key column"
611 );
612 assert!(
613 column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
614 "dense encoding should include table id column"
615 );
616 assert!(
617 column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
618 "dense encoding should include tsid column"
619 );
620 assert!(
621 column_names.contains("job"),
622 "dense encoding should keep tag columns"
623 );
624 assert!(
625 column_names.contains("instance"),
626 "dense encoding should keep tag columns"
627 );
628 }
629 }
630
631 fn job_partition_expr_json() -> String {
632 let expr = col("job")
633 .gt_eq(PartitionValue::String("job-0".into()))
634 .and(col("job").lt(PartitionValue::String("job-9".into())));
635 expr.as_json_str().unwrap()
636 }
637
638 async fn create_logical_region_with_tags(
639 env: &TestEnv,
640 physical_region_id: RegionId,
641 logical_region_id: RegionId,
642 tags: &[&str],
643 ) {
644 let region_create_request = test_util::create_logical_region_request(
645 tags,
646 physical_region_id,
647 &table_dir("test", logical_region_id.table_id()),
648 );
649 env.metric()
650 .handle_request(
651 logical_region_id,
652 RegionRequest::Create(region_create_request),
653 )
654 .await
655 .unwrap();
656 }
657
658 async fn run_batch_write_with_schema_variants(
659 env: &TestEnv,
660 physical_region_id: RegionId,
661 options: Vec<(String, String)>,
662 expect_sparse: bool,
663 ) {
664 env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), options)
665 .await;
666
667 let logical_region_1 = env.default_logical_region_id();
668 let logical_region_2 = RegionId::new(1024, 1);
669
670 create_logical_region_with_tags(env, physical_region_id, logical_region_1, &["job"]).await;
671 create_logical_region_with_tags(
672 env,
673 physical_region_id,
674 logical_region_2,
675 &["job", "instance"],
676 )
677 .await;
678
679 let schema_1 = test_util::row_schema_with_tags(&["job"]);
680 let schema_2 = test_util::row_schema_with_tags(&["job", "instance"]);
681
682 let data_region_id = RegionId::new(physical_region_id.table_id(), 2);
683 let primary_key_encoding = env
684 .metric()
685 .inner
686 .get_primary_key_encoding(data_region_id)
687 .unwrap();
688 assert_eq!(
689 primary_key_encoding,
690 if expect_sparse {
691 PrimaryKeyEncoding::Sparse
692 } else {
693 PrimaryKeyEncoding::Dense
694 }
695 );
696
697 let build_requests = || {
698 let rows_1 = test_util::build_rows(1, 3);
699 let rows_2 = test_util::build_rows(2, 2);
700
701 vec![
702 (
703 logical_region_1,
704 RegionPutRequest {
705 rows: Rows {
706 schema: schema_1.clone(),
707 rows: rows_1,
708 },
709 hint: None,
710 partition_rule_version: None,
711 },
712 ),
713 (
714 logical_region_2,
715 RegionPutRequest {
716 rows: Rows {
717 schema: schema_2.clone(),
718 rows: rows_2,
719 },
720 hint: None,
721 partition_rule_version: None,
722 },
723 ),
724 ]
725 };
726
727 let merged_request = if expect_sparse {
728 let (merged_request, _) = env
729 .metric()
730 .inner
731 .merge_sparse_batch(physical_region_id, build_requests())
732 .unwrap();
733 let hint = merged_request
734 .hint
735 .as_ref()
736 .expect("missing sparse write hint");
737 assert_eq!(
738 hint.primary_key_encoding,
739 PrimaryKeyEncodingProto::Sparse as i32
740 );
741 merged_request
742 } else {
743 let (merged_request, _) = env
744 .metric()
745 .inner
746 .merge_dense_batch(data_region_id, build_requests())
747 .unwrap();
748 assert!(merged_request.hint.is_none());
749 merged_request
750 };
751
752 assert_merged_schema(&merged_request.rows, expect_sparse);
753
754 let affected_rows = env
755 .metric()
756 .inner
757 .put_regions_batch(build_requests().into_iter())
758 .await
759 .unwrap();
760 assert_eq!(affected_rows, 5);
761
762 let request = ScanRequest::default();
763 let stream = env
764 .mito()
765 .scan_to_stream(data_region_id, request)
766 .await
767 .unwrap();
768 let batches = RecordBatches::try_collect(stream).await.unwrap();
769
770 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 5);
771 }
772
773 #[tokio::test]
774 async fn test_write_logical_region() {
775 let env = TestEnv::new().await;
776 env.init_metric_region().await;
777
778 let schema = test_util::row_schema_with_tags(&["job"]);
780 let rows = test_util::build_rows(1, 5);
781 let request = RegionRequest::Put(RegionPutRequest {
782 rows: Rows { schema, rows },
783 hint: None,
784 partition_rule_version: None,
785 });
786
787 let logical_region_id = env.default_logical_region_id();
789 let result = env
790 .metric()
791 .handle_request(logical_region_id, request)
792 .await
793 .unwrap();
794 assert_eq!(result.affected_rows, 5);
795
796 let physical_region_id = env.default_physical_region_id();
798 let request = ScanRequest::default();
799 let stream = env
800 .metric()
801 .scan_to_stream(physical_region_id, request)
802 .await
803 .unwrap();
804 let batches = RecordBatches::try_collect(stream).await.unwrap();
805 let expected = "\
806+-------------------------+----------------+------------+---------------------+-------+
807| greptime_timestamp | greptime_value | __table_id | __tsid | job |
808+-------------------------+----------------+------------+---------------------+-------+
809| 1970-01-01T00:00:00 | 0.0 | 3 | 2955007454552897459 | tag_0 |
810| 1970-01-01T00:00:00.001 | 1.0 | 3 | 2955007454552897459 | tag_0 |
811| 1970-01-01T00:00:00.002 | 2.0 | 3 | 2955007454552897459 | tag_0 |
812| 1970-01-01T00:00:00.003 | 3.0 | 3 | 2955007454552897459 | tag_0 |
813| 1970-01-01T00:00:00.004 | 4.0 | 3 | 2955007454552897459 | tag_0 |
814+-------------------------+----------------+------------+---------------------+-------+";
815 assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
816
817 let request = ScanRequest::default();
819 let stream = env
820 .metric()
821 .scan_to_stream(logical_region_id, request)
822 .await
823 .unwrap();
824 let batches = RecordBatches::try_collect(stream).await.unwrap();
825 let expected = "\
826+-------------------------+----------------+-------+
827| greptime_timestamp | greptime_value | job |
828+-------------------------+----------------+-------+
829| 1970-01-01T00:00:00 | 0.0 | tag_0 |
830| 1970-01-01T00:00:00.001 | 1.0 | tag_0 |
831| 1970-01-01T00:00:00.002 | 2.0 | tag_0 |
832| 1970-01-01T00:00:00.003 | 3.0 | tag_0 |
833| 1970-01-01T00:00:00.004 | 4.0 | tag_0 |
834+-------------------------+----------------+-------+";
835 assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
836 }
837
838 #[tokio::test]
839 async fn test_write_logical_region_row_count() {
840 let env = TestEnv::new().await;
841 env.init_metric_region().await;
842 let engine = env.metric();
843
844 let logical_region_id = env.default_logical_region_id();
846 let columns = &["odd", "even", "Ev_En"];
847 let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
848 engine
849 .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
850 .await
851 .unwrap();
852
853 let schema = test_util::row_schema_with_tags(columns);
855 let rows = test_util::build_rows(3, 100);
856 let request = RegionRequest::Put(RegionPutRequest {
857 rows: Rows { schema, rows },
858 hint: None,
859 partition_rule_version: None,
860 });
861
862 let result = engine
864 .handle_request(logical_region_id, request)
865 .await
866 .unwrap();
867 assert_eq!(100, result.affected_rows);
868 }
869
870 #[tokio::test]
871 async fn test_write_physical_region() {
872 let env = TestEnv::new().await;
873 env.init_metric_region().await;
874 let engine = env.metric();
875
876 let physical_region_id = env.default_physical_region_id();
877 let schema = test_util::row_schema_with_tags(&["abc"]);
878 let rows = test_util::build_rows(1, 100);
879 let request = RegionRequest::Put(RegionPutRequest {
880 rows: Rows { schema, rows },
881 hint: None,
882 partition_rule_version: None,
883 });
884
885 engine
886 .handle_request(physical_region_id, request)
887 .await
888 .unwrap_err();
889 }
890
891 #[tokio::test]
892 async fn test_write_nonexist_logical_region() {
893 let env = TestEnv::new().await;
894 env.init_metric_region().await;
895 let engine = env.metric();
896
897 let logical_region_id = RegionId::new(175, 8345);
898 let schema = test_util::row_schema_with_tags(&["def"]);
899 let rows = test_util::build_rows(1, 100);
900 let request = RegionRequest::Put(RegionPutRequest {
901 rows: Rows { schema, rows },
902 hint: None,
903 partition_rule_version: None,
904 });
905
906 engine
907 .handle_request(logical_region_id, request)
908 .await
909 .unwrap_err();
910 }
911
912 #[tokio::test]
913 async fn test_batch_write_multiple_logical_regions() {
914 let env = TestEnv::new().await;
915 env.init_metric_region().await;
916 let engine = env.metric();
917
918 let physical_region_id = env.default_physical_region_id();
920 let logical_region_1 = env.default_logical_region_id();
921 let logical_region_2 = RegionId::new(1024, 1);
922 let logical_region_3 = RegionId::new(1024, 2);
923
924 env.create_logical_region(physical_region_id, logical_region_2)
925 .await;
926 env.create_logical_region(physical_region_id, logical_region_3)
927 .await;
928
929 let schema = test_util::row_schema_with_tags(&["job"]);
931
932 let rows1 = test_util::build_rows(1, 3);
937 let mut rows2 = test_util::build_rows(1, 2);
938 let mut rows3 = test_util::build_rows(1, 5);
939
940 use api::v1::value::ValueData;
942 for (i, row) in rows2.iter_mut().enumerate() {
943 if let Some(ValueData::TimestampMillisecondValue(ts)) =
944 row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
945 {
946 *ts = (10 + i) as i64;
947 }
948 }
949 for (i, row) in rows3.iter_mut().enumerate() {
950 if let Some(ValueData::TimestampMillisecondValue(ts)) =
951 row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
952 {
953 *ts = (20 + i) as i64;
954 }
955 }
956
957 let requests = vec![
958 (
959 logical_region_1,
960 RegionPutRequest {
961 rows: Rows {
962 schema: schema.clone(),
963 rows: rows1,
964 },
965 hint: None,
966 partition_rule_version: None,
967 },
968 ),
969 (
970 logical_region_2,
971 RegionPutRequest {
972 rows: Rows {
973 schema: schema.clone(),
974 rows: rows2,
975 },
976 hint: None,
977 partition_rule_version: None,
978 },
979 ),
980 (
981 logical_region_3,
982 RegionPutRequest {
983 rows: Rows {
984 schema: schema.clone(),
985 rows: rows3,
986 },
987 hint: None,
988 partition_rule_version: None,
989 },
990 ),
991 ];
992
993 let affected_rows = engine
995 .inner
996 .put_regions_batch(requests.into_iter())
997 .await
998 .unwrap();
999 assert_eq!(affected_rows, 10);
1000
1001 let request = ScanRequest::default();
1003 let stream = env
1004 .metric()
1005 .scan_to_stream(physical_region_id, request)
1006 .await
1007 .unwrap();
1008 let batches = RecordBatches::try_collect(stream).await.unwrap();
1009
1010 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 10);
1012 }
1013
1014 #[tokio::test]
1015 async fn test_batch_write_with_partial_failure() {
1016 let env = TestEnv::new().await;
1017 env.init_metric_region().await;
1018 let engine = env.metric();
1019
1020 let physical_region_id = env.default_physical_region_id();
1021 let logical_region_1 = env.default_logical_region_id();
1022 let logical_region_2 = RegionId::new(1024, 1);
1023 let nonexistent_region = RegionId::new(9999, 9999);
1024
1025 env.create_logical_region(physical_region_id, logical_region_2)
1026 .await;
1027
1028 let schema = test_util::row_schema_with_tags(&["job"]);
1030 let requests = vec![
1031 (
1032 logical_region_1,
1033 RegionPutRequest {
1034 rows: Rows {
1035 schema: schema.clone(),
1036 rows: test_util::build_rows(1, 3),
1037 },
1038 hint: None,
1039 partition_rule_version: None,
1040 },
1041 ),
1042 (
1043 nonexistent_region,
1044 RegionPutRequest {
1045 rows: Rows {
1046 schema: schema.clone(),
1047 rows: test_util::build_rows(1, 2),
1048 },
1049 hint: None,
1050 partition_rule_version: None,
1051 },
1052 ),
1053 (
1054 logical_region_2,
1055 RegionPutRequest {
1056 rows: Rows {
1057 schema: schema.clone(),
1058 rows: test_util::build_rows(1, 5),
1059 },
1060 hint: None,
1061 partition_rule_version: None,
1062 },
1063 ),
1064 ];
1065
1066 let result = engine.inner.put_regions_batch(requests.into_iter()).await;
1068 assert!(result.is_err());
1069
1070 let request = ScanRequest::default();
1073 let stream = env
1074 .metric()
1075 .scan_to_stream(physical_region_id, request)
1076 .await
1077 .unwrap();
1078 let batches = RecordBatches::try_collect(stream).await.unwrap();
1079
1080 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
1081 }
1082
1083 #[tokio::test]
1084 async fn test_batch_write_single_request_fast_path() {
1085 let env = TestEnv::new().await;
1086 env.init_metric_region().await;
1087 let engine = env.metric();
1088
1089 let logical_region_id = env.default_logical_region_id();
1090 let schema = test_util::row_schema_with_tags(&["job"]);
1091
1092 let requests = vec![(
1094 logical_region_id,
1095 RegionPutRequest {
1096 rows: Rows {
1097 schema,
1098 rows: test_util::build_rows(1, 5),
1099 },
1100 hint: None,
1101 partition_rule_version: None,
1102 },
1103 )];
1104
1105 let affected_rows = engine
1106 .inner
1107 .put_regions_batch(requests.into_iter())
1108 .await
1109 .unwrap();
1110 assert_eq!(affected_rows, 5);
1111 }
1112
1113 #[tokio::test]
1114 async fn test_batch_write_empty_requests() {
1115 let env = TestEnv::new().await;
1116 env.init_metric_region().await;
1117 let engine = env.metric();
1118
1119 let requests = vec![];
1121 let affected_rows = engine
1122 .inner
1123 .put_regions_batch(requests.into_iter())
1124 .await
1125 .unwrap();
1126
1127 assert_eq!(affected_rows, 0);
1128 }
1129
1130 #[tokio::test]
1131 async fn test_batch_write_sparse_encoding() {
1132 let env = TestEnv::new().await;
1133 let physical_region_id = env.default_physical_region_id();
1134
1135 run_batch_write_with_schema_variants(
1136 &env,
1137 physical_region_id,
1138 vec![(
1139 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1140 "sparse".to_string(),
1141 )],
1142 true,
1143 )
1144 .await;
1145 }
1146
1147 #[tokio::test]
1148 async fn test_batch_write_dense_encoding() {
1149 let env = TestEnv::new().await;
1150 let physical_region_id = env.default_physical_region_id();
1151
1152 run_batch_write_with_schema_variants(
1153 &env,
1154 physical_region_id,
1155 vec![(
1156 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1157 "dense".to_string(),
1158 )],
1159 false,
1160 )
1161 .await;
1162 }
1163
1164 #[tokio::test]
1165 async fn test_metric_put_rejects_bad_partition_rule_version() {
1166 let env = TestEnv::new().await;
1167 env.init_metric_region().await;
1168
1169 let logical_region_id = env.default_logical_region_id();
1170 let rows = Rows {
1171 schema: test_util::row_schema_with_tags(&["job"]),
1172 rows: test_util::build_rows(1, 3),
1173 };
1174
1175 let err = env
1176 .metric()
1177 .handle_request(
1178 logical_region_id,
1179 RegionRequest::Put(RegionPutRequest {
1180 rows,
1181 hint: None,
1182 partition_rule_version: Some(1),
1183 }),
1184 )
1185 .await
1186 .unwrap_err();
1187
1188 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1189 }
1190
1191 #[tokio::test]
1192 async fn test_metric_put_respects_staging_partition_rule_version() {
1193 let env = TestEnv::new().await;
1194 env.init_metric_region().await;
1195
1196 let logical_region_id = env.default_logical_region_id();
1197 let physical_region_id = env.default_physical_region_id();
1198 let partition_expr = job_partition_expr_json();
1199 env.metric()
1200 .handle_request(
1201 physical_region_id,
1202 RegionRequest::EnterStaging(EnterStagingRequest {
1203 partition_expr: partition_expr.clone(),
1204 }),
1205 )
1206 .await
1207 .unwrap();
1208
1209 let expected_version = partition_rule_version(Some(&partition_expr));
1210 let rows = Rows {
1211 schema: test_util::row_schema_with_tags(&["job"]),
1212 rows: test_util::build_rows(1, 3),
1213 };
1214
1215 let err = env
1216 .metric()
1217 .handle_request(
1218 logical_region_id,
1219 RegionRequest::Put(RegionPutRequest {
1220 rows: rows.clone(),
1221 hint: None,
1222 partition_rule_version: Some(expected_version.wrapping_add(1)),
1223 }),
1224 )
1225 .await
1226 .unwrap_err();
1227 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1228
1229 let response = env
1230 .metric()
1231 .handle_request(
1232 logical_region_id,
1233 RegionRequest::Put(RegionPutRequest {
1234 rows: rows.clone(),
1235 hint: None,
1236 partition_rule_version: None,
1237 }),
1238 )
1239 .await
1240 .unwrap();
1241 assert_eq!(response.affected_rows, 3);
1242
1243 let response = env
1244 .metric()
1245 .handle_request(
1246 logical_region_id,
1247 RegionRequest::Put(RegionPutRequest {
1248 rows,
1249 hint: None,
1250 partition_rule_version: Some(expected_version),
1251 }),
1252 )
1253 .await
1254 .unwrap();
1255 assert_eq!(response.affected_rows, 3);
1256 }
1257}