1use std::collections::HashMap;
16
17use api::v1::{
18 ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint,
19};
20use common_telemetry::{error, info};
21use fxhash::FxHashMap;
22use snafu::{OptionExt, ensure};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::region_request::{
25 AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
26};
27use store_api::storage::{RegionId, TableId};
28
29use crate::engine::MetricEngineInner;
30use crate::error::{
31 ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
32 LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
33};
34use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
35use crate::row_modifier::{RowsIter, TableIdInput};
36use crate::utils::to_data_region_id;
37
38impl MetricEngineInner {
39 pub async fn put_region(
41 &self,
42 region_id: RegionId,
43 request: RegionPutRequest,
44 ) -> Result<AffectedRows> {
45 let is_putting_physical_region =
46 self.state.read().unwrap().exist_physical_region(region_id);
47
48 if is_putting_physical_region {
49 info!(
50 "Metric region received put request {request:?} on physical region {region_id:?}"
51 );
52 FORBIDDEN_OPERATION_COUNT.inc();
53
54 ForbiddenPhysicalAlterSnafu.fail()
55 } else {
56 self.put_logical_region(region_id, request).await
57 }
58 }
59
60 pub async fn put_regions_batch(
70 &self,
71 requests: impl ExactSizeIterator<Item = (RegionId, RegionPutRequest)>,
72 ) -> Result<AffectedRows> {
73 let len = requests.len();
74
75 if len == 0 {
76 return Ok(0);
77 }
78
79 let _timer = MITO_OPERATION_ELAPSED
80 .with_label_values(&["put_batch"])
81 .start_timer();
82
83 if len == 1 {
85 let (logical_id, req) = requests.into_iter().next().unwrap();
86 return self.put_logical_region(logical_id, req).await;
87 }
88
89 let mut requests_per_physical: HashMap<RegionId, Vec<(RegionId, RegionPutRequest)>> =
90 HashMap::new();
91 for (logical_region_id, request) in requests {
92 let physical_region_id = self.find_physical_region_id(logical_region_id)?;
93 requests_per_physical
94 .entry(physical_region_id)
95 .or_default()
96 .push((logical_region_id, request));
97 }
98
99 let mut total_affected_rows: AffectedRows = 0;
100 for (physical_region_id, requests) in requests_per_physical {
101 let affected_rows = self
102 .put_regions_batch_single_physical(physical_region_id, requests)
103 .await?;
104 total_affected_rows += affected_rows;
105 }
106
107 Ok(total_affected_rows)
108 }
109
110 async fn put_regions_batch_single_physical(
117 &self,
118 physical_region_id: RegionId,
119 requests: Vec<(RegionId, RegionPutRequest)>,
120 ) -> Result<AffectedRows> {
121 if requests.is_empty() {
122 return Ok(0);
123 }
124
125 let data_region_id = to_data_region_id(physical_region_id);
126 let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?;
127
128 self.validate_batch_requests(physical_region_id, &requests)
130 .await?;
131
132 let (merged_request, total_affected_rows) = match primary_key_encoding {
134 PrimaryKeyEncoding::Sparse => self.merge_sparse_batch(physical_region_id, requests)?,
135 PrimaryKeyEncoding::Dense => self.merge_dense_batch(data_region_id, requests)?,
136 };
137
138 self.data_region
140 .write_data(data_region_id, RegionRequest::Put(merged_request))
141 .await?;
142
143 Ok(total_affected_rows)
144 }
145
146 fn get_primary_key_encoding(&self, data_region_id: RegionId) -> Result<PrimaryKeyEncoding> {
148 let state = self.state.read().unwrap();
149 state
150 .get_primary_key_encoding(data_region_id)
151 .context(PhysicalRegionNotFoundSnafu {
152 region_id: data_region_id,
153 })
154 }
155
156 async fn validate_batch_requests(
158 &self,
159 physical_region_id: RegionId,
160 requests: &[(RegionId, RegionPutRequest)],
161 ) -> Result<()> {
162 for (logical_region_id, request) in requests {
163 self.verify_rows(*logical_region_id, physical_region_id, &request.rows)
164 .await?;
165 }
166 Ok(())
167 }
168
169 fn merge_sparse_batch(
171 &self,
172 physical_region_id: RegionId,
173 requests: Vec<(RegionId, RegionPutRequest)>,
174 ) -> Result<(RegionPutRequest, AffectedRows)> {
175 let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
176 let mut merged_rows = Vec::with_capacity(total_rows);
177 let mut total_affected_rows: AffectedRows = 0;
178 let mut output_schema: Option<Vec<ColumnSchema>> = None;
179 let mut merged_version: Option<u64> = None;
180
181 for (logical_region_id, mut request) in requests {
183 if let Some(request_version) = request.partition_expr_version {
184 if let Some(merged_version) = merged_version {
185 ensure!(
186 merged_version == request_version,
187 InvalidRequestSnafu {
188 region_id: physical_region_id,
189 reason: "inconsistent partition expr version in batch"
190 }
191 );
192 } else {
193 merged_version = Some(request_version);
194 }
195 }
196 self.modify_rows(
197 physical_region_id,
198 logical_region_id.table_id(),
199 &mut request.rows,
200 PrimaryKeyEncoding::Sparse,
201 )?;
202
203 let row_count = request.rows.rows.len();
204 total_affected_rows += row_count as AffectedRows;
205
206 if output_schema.is_none() {
208 output_schema = Some(request.rows.schema.clone());
209 }
210
211 merged_rows.extend(request.rows.rows);
212 }
213
214 let schema = output_schema.unwrap();
216
217 let merged_request = RegionPutRequest {
218 rows: Rows {
219 schema,
220 rows: merged_rows,
221 },
222 hint: Some(WriteHint {
223 primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
224 }),
225 partition_expr_version: merged_version,
226 };
227
228 Ok((merged_request, total_affected_rows))
229 }
230
231 fn merge_dense_batch(
237 &self,
238 data_region_id: RegionId,
239 requests: Vec<(RegionId, RegionPutRequest)>,
240 ) -> Result<(RegionPutRequest, AffectedRows)> {
241 let merged_schema = Self::build_union_schema(&requests);
243
244 let (merged_rows, table_ids, merged_version) =
246 Self::align_requests_to_schema(requests, &merged_schema)?;
247
248 let final_rows = {
250 let state = self.state.read().unwrap();
251 let name_to_id = state
252 .physical_region_states()
253 .get(&data_region_id)
254 .with_context(|| PhysicalRegionNotFoundSnafu {
255 region_id: data_region_id,
256 })?
257 .physical_columns();
258
259 let iter = RowsIter::new(
260 Rows {
261 schema: merged_schema,
262 rows: merged_rows,
263 },
264 name_to_id,
265 );
266
267 self.row_modifier.modify_rows(
268 iter,
269 TableIdInput::Batch(&table_ids),
270 PrimaryKeyEncoding::Dense,
271 )?
272 };
273
274 let merged_request = RegionPutRequest {
275 rows: final_rows,
276 hint: None,
277 partition_expr_version: merged_version,
278 };
279
280 Ok((merged_request, table_ids.len() as AffectedRows))
281 }
282
283 fn build_union_schema(requests: &[(RegionId, RegionPutRequest)]) -> Vec<ColumnSchema> {
285 let mut schema_map: HashMap<&str, ColumnSchema> = HashMap::new();
286 for (_, request) in requests {
287 for col in &request.rows.schema {
288 schema_map
289 .entry(col.column_name.as_str())
290 .or_insert_with(|| col.clone());
291 }
292 }
293 schema_map.into_values().collect()
294 }
295
296 fn align_requests_to_schema(
297 requests: Vec<(RegionId, RegionPutRequest)>,
298 merged_schema: &[ColumnSchema],
299 ) -> Result<(Vec<Row>, Vec<TableId>, Option<u64>)> {
300 let total_rows: usize = requests.iter().map(|(_, req)| req.rows.rows.len()).sum();
302 let mut merged_rows = Vec::with_capacity(total_rows);
303 let mut table_ids = Vec::with_capacity(total_rows);
304 let mut merged_version: Option<u64> = None;
305
306 let null_value = Value { value_data: None };
307
308 for (logical_region_id, request) in requests {
309 if let Some(request_version) = request.partition_expr_version {
310 if let Some(merged_version) = merged_version {
311 ensure!(
312 merged_version == request_version,
313 InvalidRequestSnafu {
314 region_id: logical_region_id,
315 reason: "inconsistent partition expr version in batch"
316 }
317 );
318 } else {
319 merged_version = Some(request_version);
320 }
321 }
322 let table_id = logical_region_id.table_id();
323
324 let col_name_to_idx: FxHashMap<&str, usize> = request
326 .rows
327 .schema
328 .iter()
329 .enumerate()
330 .map(|(idx, col)| (col.column_name.as_str(), idx))
331 .collect();
332
333 let col_mapping: Vec<Option<usize>> = merged_schema
337 .iter()
338 .map(|merged_col| {
339 col_name_to_idx
340 .get(merged_col.column_name.as_str())
341 .copied()
342 })
343 .collect();
344
345 for mut row in request.rows.rows {
347 let mut aligned_values = Vec::with_capacity(merged_schema.len());
348 for &opt_idx in &col_mapping {
349 aligned_values.push(match opt_idx {
350 Some(idx) => std::mem::take(&mut row.values[idx]),
351 None => null_value.clone(),
352 });
353 }
354 merged_rows.push(Row {
355 values: aligned_values,
356 });
357 table_ids.push(table_id);
358 }
359 }
360
361 Ok((merged_rows, table_ids, merged_version))
362 }
363
364 fn find_physical_region_id(&self, logical_region_id: RegionId) -> Result<RegionId> {
366 let state = self.state.read().unwrap();
367 state
368 .logical_regions()
369 .get(&logical_region_id)
370 .copied()
371 .context(LogicalRegionNotFoundSnafu {
372 region_id: logical_region_id,
373 })
374 }
375
376 pub async fn delete_region(
378 &self,
379 region_id: RegionId,
380 request: RegionDeleteRequest,
381 ) -> Result<AffectedRows> {
382 if self.is_physical_region(region_id) {
383 info!(
384 "Metric region received delete request {request:?} on physical region {region_id:?}"
385 );
386 FORBIDDEN_OPERATION_COUNT.inc();
387
388 UnsupportedRegionRequestSnafu {
389 request: RegionRequest::Delete(request),
390 }
391 .fail()
392 } else {
393 self.delete_logical_region(region_id, request).await
394 }
395 }
396
397 async fn put_logical_region(
398 &self,
399 logical_region_id: RegionId,
400 mut request: RegionPutRequest,
401 ) -> Result<AffectedRows> {
402 let _timer = MITO_OPERATION_ELAPSED
403 .with_label_values(&["put"])
404 .start_timer();
405
406 let (physical_region_id, data_region_id, primary_key_encoding) =
407 self.find_data_region_meta(logical_region_id)?;
408
409 self.verify_rows(logical_region_id, physical_region_id, &request.rows)
410 .await?;
411
412 self.modify_rows(
415 physical_region_id,
416 logical_region_id.table_id(),
417 &mut request.rows,
418 primary_key_encoding,
419 )?;
420 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
421 request.hint = Some(WriteHint {
422 primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
423 });
424 }
425 self.data_region
426 .write_data(data_region_id, RegionRequest::Put(request))
427 .await
428 }
429
430 async fn delete_logical_region(
431 &self,
432 logical_region_id: RegionId,
433 mut request: RegionDeleteRequest,
434 ) -> Result<AffectedRows> {
435 let _timer = MITO_OPERATION_ELAPSED
436 .with_label_values(&["delete"])
437 .start_timer();
438
439 let (physical_region_id, data_region_id, primary_key_encoding) =
440 self.find_data_region_meta(logical_region_id)?;
441
442 self.verify_rows(logical_region_id, physical_region_id, &request.rows)
443 .await?;
444
445 self.modify_rows(
448 physical_region_id,
449 logical_region_id.table_id(),
450 &mut request.rows,
451 primary_key_encoding,
452 )?;
453 if primary_key_encoding == PrimaryKeyEncoding::Sparse {
454 request.hint = Some(WriteHint {
455 primary_key_encoding: PrimaryKeyEncodingProto::Sparse.into(),
456 });
457 }
458 self.data_region
459 .write_data(data_region_id, RegionRequest::Delete(request))
460 .await
461 }
462
463 fn find_data_region_meta(
464 &self,
465 logical_region_id: RegionId,
466 ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {
467 let state = self.state.read().unwrap();
468 let physical_region_id = *state
469 .logical_regions()
470 .get(&logical_region_id)
471 .with_context(|| LogicalRegionNotFoundSnafu {
472 region_id: logical_region_id,
473 })?;
474 let data_region_id = to_data_region_id(physical_region_id);
475 let primary_key_encoding = state.get_primary_key_encoding(data_region_id).context(
476 PhysicalRegionNotFoundSnafu {
477 region_id: data_region_id,
478 },
479 )?;
480 Ok((physical_region_id, data_region_id, primary_key_encoding))
481 }
482
483 async fn verify_rows(
489 &self,
490 logical_region_id: RegionId,
491 physical_region_id: RegionId,
492 rows: &Rows,
493 ) -> Result<()> {
494 let data_region_id = to_data_region_id(physical_region_id);
496 let state = self.state.read().unwrap();
497 if !state.is_logical_region_exist(logical_region_id) {
498 error!("Trying to write to an nonexistent region {logical_region_id}");
499 return LogicalRegionNotFoundSnafu {
500 region_id: logical_region_id,
501 }
502 .fail();
503 }
504
505 let physical_columns = state
507 .physical_region_states()
508 .get(&data_region_id)
509 .context(PhysicalRegionNotFoundSnafu {
510 region_id: data_region_id,
511 })?
512 .physical_columns();
513 for col in &rows.schema {
514 ensure!(
515 physical_columns.contains_key(&col.column_name),
516 ColumnNotFoundSnafu {
517 name: col.column_name.clone(),
518 region_id: logical_region_id,
519 }
520 );
521 }
522
523 Ok(())
524 }
525
526 fn modify_rows(
530 &self,
531 physical_region_id: RegionId,
532 table_id: TableId,
533 rows: &mut Rows,
534 encoding: PrimaryKeyEncoding,
535 ) -> Result<()> {
536 let input = std::mem::take(rows);
537 let iter = {
538 let state = self.state.read().unwrap();
539 let name_to_id = state
540 .physical_region_states()
541 .get(&physical_region_id)
542 .with_context(|| PhysicalRegionNotFoundSnafu {
543 region_id: physical_region_id,
544 })?
545 .physical_columns();
546 RowsIter::new(input, name_to_id)
547 };
548 let output =
549 self.row_modifier
550 .modify_rows(iter, TableIdInput::Single(table_id), encoding)?;
551 *rows = output;
552 Ok(())
553 }
554}
555
556#[cfg(test)]
557mod tests {
558 use std::collections::HashSet;
559
560 use common_error::ext::ErrorExt;
561 use common_error::status_code::StatusCode;
562 use common_function::utils::partition_expr_version;
563 use common_recordbatch::RecordBatches;
564 use datatypes::value::Value as PartitionValue;
565 use partition::expr::col;
566 use store_api::metric_engine_consts::{
567 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
568 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
569 };
570 use store_api::path_utils::table_dir;
571 use store_api::region_engine::RegionEngine;
572 use store_api::region_request::{
573 EnterStagingRequest, RegionRequest, StagingPartitionDirective,
574 };
575 use store_api::storage::ScanRequest;
576 use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
577
578 use super::*;
579 use crate::test_util::{self, TestEnv};
580
581 fn assert_merged_schema(rows: &Rows, expect_sparse: bool) {
582 let column_names: HashSet<String> = rows
583 .schema
584 .iter()
585 .map(|col| col.column_name.clone())
586 .collect();
587
588 if expect_sparse {
589 assert!(
590 column_names.contains(PRIMARY_KEY_COLUMN_NAME),
591 "sparse encoding should include primary key column"
592 );
593 assert!(
594 !column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
595 "sparse encoding should not include table id column"
596 );
597 assert!(
598 !column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
599 "sparse encoding should not include tsid column"
600 );
601 assert!(
602 !column_names.contains("job"),
603 "sparse encoding should not include tag columns"
604 );
605 assert!(
606 !column_names.contains("instance"),
607 "sparse encoding should not include tag columns"
608 );
609 } else {
610 assert!(
611 !column_names.contains(PRIMARY_KEY_COLUMN_NAME),
612 "dense encoding should not include primary key column"
613 );
614 assert!(
615 column_names.contains(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
616 "dense encoding should include table id column"
617 );
618 assert!(
619 column_names.contains(DATA_SCHEMA_TSID_COLUMN_NAME),
620 "dense encoding should include tsid column"
621 );
622 assert!(
623 column_names.contains("job"),
624 "dense encoding should keep tag columns"
625 );
626 assert!(
627 column_names.contains("instance"),
628 "dense encoding should keep tag columns"
629 );
630 }
631 }
632
633 fn job_partition_expr_json() -> String {
634 let expr = col("job")
635 .gt_eq(PartitionValue::String("job-0".into()))
636 .and(col("job").lt(PartitionValue::String("job-9".into())));
637 expr.as_json_str().unwrap()
638 }
639
640 async fn create_logical_region_with_tags(
641 env: &TestEnv,
642 physical_region_id: RegionId,
643 logical_region_id: RegionId,
644 tags: &[&str],
645 ) {
646 let region_create_request = test_util::create_logical_region_request(
647 tags,
648 physical_region_id,
649 &table_dir("test", logical_region_id.table_id()),
650 );
651 env.metric()
652 .handle_request(
653 logical_region_id,
654 RegionRequest::Create(region_create_request),
655 )
656 .await
657 .unwrap();
658 }
659
660 async fn run_batch_write_with_schema_variants(
661 env: &TestEnv,
662 physical_region_id: RegionId,
663 options: Vec<(String, String)>,
664 expect_sparse: bool,
665 ) {
666 env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), options)
667 .await;
668
669 let logical_region_1 = env.default_logical_region_id();
670 let logical_region_2 = RegionId::new(1024, 1);
671
672 create_logical_region_with_tags(env, physical_region_id, logical_region_1, &["job"]).await;
673 create_logical_region_with_tags(
674 env,
675 physical_region_id,
676 logical_region_2,
677 &["job", "instance"],
678 )
679 .await;
680
681 let schema_1 = test_util::row_schema_with_tags(&["job"]);
682 let schema_2 = test_util::row_schema_with_tags(&["job", "instance"]);
683
684 let data_region_id = RegionId::new(physical_region_id.table_id(), 2);
685 let primary_key_encoding = env
686 .metric()
687 .inner
688 .get_primary_key_encoding(data_region_id)
689 .unwrap();
690 assert_eq!(
691 primary_key_encoding,
692 if expect_sparse {
693 PrimaryKeyEncoding::Sparse
694 } else {
695 PrimaryKeyEncoding::Dense
696 }
697 );
698
699 let build_requests = || {
700 let rows_1 = test_util::build_rows(1, 3);
701 let rows_2 = test_util::build_rows(2, 2);
702
703 vec![
704 (
705 logical_region_1,
706 RegionPutRequest {
707 rows: Rows {
708 schema: schema_1.clone(),
709 rows: rows_1,
710 },
711 hint: None,
712 partition_expr_version: None,
713 },
714 ),
715 (
716 logical_region_2,
717 RegionPutRequest {
718 rows: Rows {
719 schema: schema_2.clone(),
720 rows: rows_2,
721 },
722 hint: None,
723 partition_expr_version: None,
724 },
725 ),
726 ]
727 };
728
729 let merged_request = if expect_sparse {
730 let (merged_request, _) = env
731 .metric()
732 .inner
733 .merge_sparse_batch(physical_region_id, build_requests())
734 .unwrap();
735 let hint = merged_request
736 .hint
737 .as_ref()
738 .expect("missing sparse write hint");
739 assert_eq!(
740 hint.primary_key_encoding,
741 PrimaryKeyEncodingProto::Sparse as i32
742 );
743 merged_request
744 } else {
745 let (merged_request, _) = env
746 .metric()
747 .inner
748 .merge_dense_batch(data_region_id, build_requests())
749 .unwrap();
750 assert!(merged_request.hint.is_none());
751 merged_request
752 };
753
754 assert_merged_schema(&merged_request.rows, expect_sparse);
755
756 let affected_rows = env
757 .metric()
758 .inner
759 .put_regions_batch(build_requests().into_iter())
760 .await
761 .unwrap();
762 assert_eq!(affected_rows, 5);
763
764 let request = ScanRequest::default();
765 let stream = env
766 .mito()
767 .scan_to_stream(data_region_id, request)
768 .await
769 .unwrap();
770 let batches = RecordBatches::try_collect(stream).await.unwrap();
771
772 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 5);
773 }
774
775 #[tokio::test]
776 async fn test_write_logical_region() {
777 let env = TestEnv::new().await;
778 env.init_metric_region().await;
779
780 let schema = test_util::row_schema_with_tags(&["job"]);
782 let rows = test_util::build_rows(1, 5);
783 let request = RegionRequest::Put(RegionPutRequest {
784 rows: Rows { schema, rows },
785 hint: None,
786 partition_expr_version: None,
787 });
788
789 let logical_region_id = env.default_logical_region_id();
791 let result = env
792 .metric()
793 .handle_request(logical_region_id, request)
794 .await
795 .unwrap();
796 assert_eq!(result.affected_rows, 5);
797
798 let physical_region_id = env.default_physical_region_id();
800 let request = ScanRequest::default();
801 let stream = env
802 .metric()
803 .scan_to_stream(physical_region_id, request)
804 .await
805 .unwrap();
806 let batches = RecordBatches::try_collect(stream).await.unwrap();
807 let expected = "\
808+-------------------------+----------------+------------+---------------------+-------+
809| greptime_timestamp | greptime_value | __table_id | __tsid | job |
810+-------------------------+----------------+------------+---------------------+-------+
811| 1970-01-01T00:00:00 | 0.0 | 3 | 2955007454552897459 | tag_0 |
812| 1970-01-01T00:00:00.001 | 1.0 | 3 | 2955007454552897459 | tag_0 |
813| 1970-01-01T00:00:00.002 | 2.0 | 3 | 2955007454552897459 | tag_0 |
814| 1970-01-01T00:00:00.003 | 3.0 | 3 | 2955007454552897459 | tag_0 |
815| 1970-01-01T00:00:00.004 | 4.0 | 3 | 2955007454552897459 | tag_0 |
816+-------------------------+----------------+------------+---------------------+-------+";
817 assert_eq!(expected, batches.pretty_print().unwrap(), "physical region");
818
819 let request = ScanRequest::default();
821 let stream = env
822 .metric()
823 .scan_to_stream(logical_region_id, request)
824 .await
825 .unwrap();
826 let batches = RecordBatches::try_collect(stream).await.unwrap();
827 let expected = "\
828+-------------------------+----------------+-------+
829| greptime_timestamp | greptime_value | job |
830+-------------------------+----------------+-------+
831| 1970-01-01T00:00:00 | 0.0 | tag_0 |
832| 1970-01-01T00:00:00.001 | 1.0 | tag_0 |
833| 1970-01-01T00:00:00.002 | 2.0 | tag_0 |
834| 1970-01-01T00:00:00.003 | 3.0 | tag_0 |
835| 1970-01-01T00:00:00.004 | 4.0 | tag_0 |
836+-------------------------+----------------+-------+";
837 assert_eq!(expected, batches.pretty_print().unwrap(), "logical region");
838 }
839
840 #[tokio::test]
841 async fn test_write_logical_region_row_count() {
842 let env = TestEnv::new().await;
843 env.init_metric_region().await;
844 let engine = env.metric();
845
846 let logical_region_id = env.default_logical_region_id();
848 let columns = &["odd", "even", "Ev_En"];
849 let alter_request = test_util::alter_logical_region_add_tag_columns(123456, columns);
850 engine
851 .handle_request(logical_region_id, RegionRequest::Alter(alter_request))
852 .await
853 .unwrap();
854
855 let schema = test_util::row_schema_with_tags(columns);
857 let rows = test_util::build_rows(3, 100);
858 let request = RegionRequest::Put(RegionPutRequest {
859 rows: Rows { schema, rows },
860 hint: None,
861 partition_expr_version: None,
862 });
863
864 let result = engine
866 .handle_request(logical_region_id, request)
867 .await
868 .unwrap();
869 assert_eq!(100, result.affected_rows);
870 }
871
872 #[tokio::test]
873 async fn test_write_physical_region() {
874 let env = TestEnv::new().await;
875 env.init_metric_region().await;
876 let engine = env.metric();
877
878 let physical_region_id = env.default_physical_region_id();
879 let schema = test_util::row_schema_with_tags(&["abc"]);
880 let rows = test_util::build_rows(1, 100);
881 let request = RegionRequest::Put(RegionPutRequest {
882 rows: Rows { schema, rows },
883 hint: None,
884 partition_expr_version: None,
885 });
886
887 engine
888 .handle_request(physical_region_id, request)
889 .await
890 .unwrap_err();
891 }
892
893 #[tokio::test]
894 async fn test_write_nonexist_logical_region() {
895 let env = TestEnv::new().await;
896 env.init_metric_region().await;
897 let engine = env.metric();
898
899 let logical_region_id = RegionId::new(175, 8345);
900 let schema = test_util::row_schema_with_tags(&["def"]);
901 let rows = test_util::build_rows(1, 100);
902 let request = RegionRequest::Put(RegionPutRequest {
903 rows: Rows { schema, rows },
904 hint: None,
905 partition_expr_version: None,
906 });
907
908 engine
909 .handle_request(logical_region_id, request)
910 .await
911 .unwrap_err();
912 }
913
914 #[tokio::test]
915 async fn test_batch_write_multiple_logical_regions() {
916 let env = TestEnv::new().await;
917 env.init_metric_region().await;
918 let engine = env.metric();
919
920 let physical_region_id = env.default_physical_region_id();
922 let logical_region_1 = env.default_logical_region_id();
923 let logical_region_2 = RegionId::new(1024, 1);
924 let logical_region_3 = RegionId::new(1024, 2);
925
926 env.create_logical_region(physical_region_id, logical_region_2)
927 .await;
928 env.create_logical_region(physical_region_id, logical_region_3)
929 .await;
930
931 let schema = test_util::row_schema_with_tags(&["job"]);
933
934 let rows1 = test_util::build_rows(1, 3);
939 let mut rows2 = test_util::build_rows(1, 2);
940 let mut rows3 = test_util::build_rows(1, 5);
941
942 use api::v1::value::ValueData;
944 for (i, row) in rows2.iter_mut().enumerate() {
945 if let Some(ValueData::TimestampMillisecondValue(ts)) =
946 row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
947 {
948 *ts = (10 + i) as i64;
949 }
950 }
951 for (i, row) in rows3.iter_mut().enumerate() {
952 if let Some(ValueData::TimestampMillisecondValue(ts)) =
953 row.values.get_mut(0).and_then(|v| v.value_data.as_mut())
954 {
955 *ts = (20 + i) as i64;
956 }
957 }
958
959 let requests = vec![
960 (
961 logical_region_1,
962 RegionPutRequest {
963 rows: Rows {
964 schema: schema.clone(),
965 rows: rows1,
966 },
967 hint: None,
968 partition_expr_version: None,
969 },
970 ),
971 (
972 logical_region_2,
973 RegionPutRequest {
974 rows: Rows {
975 schema: schema.clone(),
976 rows: rows2,
977 },
978 hint: None,
979 partition_expr_version: None,
980 },
981 ),
982 (
983 logical_region_3,
984 RegionPutRequest {
985 rows: Rows {
986 schema: schema.clone(),
987 rows: rows3,
988 },
989 hint: None,
990 partition_expr_version: None,
991 },
992 ),
993 ];
994
995 let affected_rows = engine
997 .inner
998 .put_regions_batch(requests.into_iter())
999 .await
1000 .unwrap();
1001 assert_eq!(affected_rows, 10);
1002
1003 let request = ScanRequest::default();
1005 let stream = env
1006 .metric()
1007 .scan_to_stream(physical_region_id, request)
1008 .await
1009 .unwrap();
1010 let batches = RecordBatches::try_collect(stream).await.unwrap();
1011
1012 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 10);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_batch_write_with_partial_failure() {
1018 let env = TestEnv::new().await;
1019 env.init_metric_region().await;
1020 let engine = env.metric();
1021
1022 let physical_region_id = env.default_physical_region_id();
1023 let logical_region_1 = env.default_logical_region_id();
1024 let logical_region_2 = RegionId::new(1024, 1);
1025 let nonexistent_region = RegionId::new(9999, 9999);
1026
1027 env.create_logical_region(physical_region_id, logical_region_2)
1028 .await;
1029
1030 let schema = test_util::row_schema_with_tags(&["job"]);
1032 let requests = vec![
1033 (
1034 logical_region_1,
1035 RegionPutRequest {
1036 rows: Rows {
1037 schema: schema.clone(),
1038 rows: test_util::build_rows(1, 3),
1039 },
1040 hint: None,
1041 partition_expr_version: None,
1042 },
1043 ),
1044 (
1045 nonexistent_region,
1046 RegionPutRequest {
1047 rows: Rows {
1048 schema: schema.clone(),
1049 rows: test_util::build_rows(1, 2),
1050 },
1051 hint: None,
1052 partition_expr_version: None,
1053 },
1054 ),
1055 (
1056 logical_region_2,
1057 RegionPutRequest {
1058 rows: Rows {
1059 schema: schema.clone(),
1060 rows: test_util::build_rows(1, 5),
1061 },
1062 hint: None,
1063 partition_expr_version: None,
1064 },
1065 ),
1066 ];
1067
1068 let result = engine.inner.put_regions_batch(requests.into_iter()).await;
1070 assert!(result.is_err());
1071
1072 let request = ScanRequest::default();
1075 let stream = env
1076 .metric()
1077 .scan_to_stream(physical_region_id, request)
1078 .await
1079 .unwrap();
1080 let batches = RecordBatches::try_collect(stream).await.unwrap();
1081
1082 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 0);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_batch_write_single_request_fast_path() {
1087 let env = TestEnv::new().await;
1088 env.init_metric_region().await;
1089 let engine = env.metric();
1090
1091 let logical_region_id = env.default_logical_region_id();
1092 let schema = test_util::row_schema_with_tags(&["job"]);
1093
1094 let requests = vec![(
1096 logical_region_id,
1097 RegionPutRequest {
1098 rows: Rows {
1099 schema,
1100 rows: test_util::build_rows(1, 5),
1101 },
1102 hint: None,
1103 partition_expr_version: None,
1104 },
1105 )];
1106
1107 let affected_rows = engine
1108 .inner
1109 .put_regions_batch(requests.into_iter())
1110 .await
1111 .unwrap();
1112 assert_eq!(affected_rows, 5);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_batch_write_empty_requests() {
1117 let env = TestEnv::new().await;
1118 env.init_metric_region().await;
1119 let engine = env.metric();
1120
1121 let requests = vec![];
1123 let affected_rows = engine
1124 .inner
1125 .put_regions_batch(requests.into_iter())
1126 .await
1127 .unwrap();
1128
1129 assert_eq!(affected_rows, 0);
1130 }
1131
1132 #[tokio::test]
1133 async fn test_batch_write_sparse_encoding() {
1134 let env = TestEnv::new().await;
1135 let physical_region_id = env.default_physical_region_id();
1136
1137 run_batch_write_with_schema_variants(
1138 &env,
1139 physical_region_id,
1140 vec![(
1141 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1142 "sparse".to_string(),
1143 )],
1144 true,
1145 )
1146 .await;
1147 }
1148
1149 #[tokio::test]
1150 async fn test_batch_write_dense_encoding() {
1151 let env = TestEnv::new().await;
1152 let physical_region_id = env.default_physical_region_id();
1153
1154 run_batch_write_with_schema_variants(
1155 &env,
1156 physical_region_id,
1157 vec![(
1158 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
1159 "dense".to_string(),
1160 )],
1161 false,
1162 )
1163 .await;
1164 }
1165
1166 #[tokio::test]
1167 async fn test_metric_put_rejects_bad_partition_expr_version() {
1168 let env = TestEnv::new().await;
1169 env.init_metric_region().await;
1170
1171 let logical_region_id = env.default_logical_region_id();
1172 let rows = Rows {
1173 schema: test_util::row_schema_with_tags(&["job"]),
1174 rows: test_util::build_rows(1, 3),
1175 };
1176
1177 let err = env
1178 .metric()
1179 .handle_request(
1180 logical_region_id,
1181 RegionRequest::Put(RegionPutRequest {
1182 rows,
1183 hint: None,
1184 partition_expr_version: Some(1),
1185 }),
1186 )
1187 .await
1188 .unwrap_err();
1189
1190 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1191 }
1192
1193 #[tokio::test]
1194 async fn test_metric_put_respects_staging_partition_expr_version() {
1195 let env = TestEnv::new().await;
1196 env.init_metric_region().await;
1197
1198 let logical_region_id = env.default_logical_region_id();
1199 let physical_region_id = env.default_physical_region_id();
1200 let partition_expr = job_partition_expr_json();
1201 env.metric()
1202 .handle_request(
1203 physical_region_id,
1204 RegionRequest::EnterStaging(EnterStagingRequest {
1205 partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
1206 partition_expr.clone(),
1207 ),
1208 }),
1209 )
1210 .await
1211 .unwrap();
1212
1213 let expected_version = partition_expr_version(Some(&partition_expr));
1214 let rows = Rows {
1215 schema: test_util::row_schema_with_tags(&["job"]),
1216 rows: test_util::build_rows(1, 3),
1217 };
1218
1219 let err = env
1220 .metric()
1221 .handle_request(
1222 logical_region_id,
1223 RegionRequest::Put(RegionPutRequest {
1224 rows: rows.clone(),
1225 hint: None,
1226 partition_expr_version: Some(expected_version.wrapping_add(1)),
1227 }),
1228 )
1229 .await
1230 .unwrap_err();
1231 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
1232
1233 let response = env
1234 .metric()
1235 .handle_request(
1236 logical_region_id,
1237 RegionRequest::Put(RegionPutRequest {
1238 rows: rows.clone(),
1239 hint: None,
1240 partition_expr_version: None,
1241 }),
1242 )
1243 .await
1244 .unwrap();
1245 assert_eq!(response.affected_rows, 3);
1246
1247 let response = env
1248 .metric()
1249 .handle_request(
1250 logical_region_id,
1251 RegionRequest::Put(RegionPutRequest {
1252 rows,
1253 hint: None,
1254 partition_expr_version: Some(expected_version),
1255 }),
1256 )
1257 .await
1258 .unwrap();
1259 assert_eq!(response.affected_rows, 3);
1260 }
1261}