1use std::collections::HashSet;
16
17use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
18use bytes::Bytes;
19use common_error::ext::ErrorExt;
20use common_error::status_code::StatusCode;
21use common_grpc::flight::{FlightEncoder, FlightMessage};
22use common_query::prelude::{greptime_timestamp, greptime_value};
23use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
24use datatypes::arrow::record_batch::RecordBatch;
25use snafu::{OptionExt, ensure};
26use store_api::codec::PrimaryKeyEncoding;
27use store_api::metadata::RegionMetadataRef;
28use store_api::region_request::{
29 AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
30};
31use store_api::storage::RegionId;
32
33use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
34use crate::engine::MetricEngineInner;
35use crate::error;
36use crate::error::Result;
37use crate::metrics::MITO_OPERATION_ELAPSED;
38
39impl MetricEngineInner {
40 pub async fn bulk_insert_region(
54 &self,
55 region_id: RegionId,
56 request: RegionBulkInsertsRequest,
57 ) -> Result<AffectedRows> {
58 if request.payload.num_rows() == 0 {
59 return Ok(0);
60 }
61 if self.is_physical_region(region_id) {
62 let _timer = MITO_OPERATION_ELAPSED
63 .with_label_values(&["bulk_insert_physical"])
64 .start_timer();
65 return self.bulk_insert_physical_region(region_id, request).await;
66 }
67
68 let _timer = MITO_OPERATION_ELAPSED
69 .with_label_values(&["bulk_insert_logical"])
70 .start_timer();
71 self.bulk_insert_logical_region(region_id, request).await
72 }
73
74 async fn bulk_insert_physical_region(
79 &self,
80 region_id: RegionId,
81 request: RegionBulkInsertsRequest,
82 ) -> Result<AffectedRows> {
83 self.data_region
84 .write_data(region_id, RegionRequest::BulkInserts(request))
85 .await
86 }
87
88 async fn bulk_insert_logical_region(
90 &self,
91 region_id: RegionId,
92 request: RegionBulkInsertsRequest,
93 ) -> Result<AffectedRows> {
94 let (physical_region_id, data_region_id, primary_key_encoding) =
95 self.find_data_region_meta(region_id)?;
96
97 if primary_key_encoding != PrimaryKeyEncoding::Sparse {
98 return error::UnsupportedRegionRequestSnafu {
99 request: RegionRequest::BulkInserts(request),
100 }
101 .fail();
102 }
103
104 let batch = request.payload;
105 if batch.num_rows() == 0 {
106 return Ok(0);
107 }
108
109 let logical_metadata = self
110 .logical_region_metadata(physical_region_id, region_id)
111 .await?;
112 let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
113 region_id,
114 data_region_id,
115 &batch,
116 &logical_metadata,
117 )?;
118 let modified_batch = modify_batch_sparse(
119 batch.clone(),
120 region_id.table_id(),
121 &tag_columns,
122 &non_tag_indices,
123 )?;
124 let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
125
126 let partition_expr_version = request.partition_expr_version;
127 let request = RegionBulkInsertsRequest {
128 region_id: data_region_id,
129 payload: modified_batch,
130 raw_data: ArrowIpc {
131 schema,
132 data_header,
133 payload,
134 },
135 partition_expr_version,
136 };
137 match self
138 .data_region
139 .write_data(data_region_id, RegionRequest::BulkInserts(request))
140 .await
141 {
142 Ok(affected_rows) => Ok(affected_rows),
143 Err(err) if err.status_code() == StatusCode::Unsupported => {
144 let rows = record_batch_to_rows(&batch, region_id)?;
146 self.put_region(
147 region_id,
148 RegionPutRequest {
149 rows,
150 hint: None,
151 partition_expr_version,
152 },
153 )
154 .await
155 }
156 Err(err) => Err(err),
157 }
158 }
159
160 fn resolve_tag_columns_from_metadata(
161 &self,
162 logical_region_id: RegionId,
163 data_region_id: RegionId,
164 batch: &RecordBatch,
165 logical_metadata: &RegionMetadataRef,
166 ) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
167 let tag_names: HashSet<&str> = logical_metadata
168 .column_metadatas
169 .iter()
170 .filter_map(|column| {
171 if column.semantic_type == SemanticType::Tag {
172 Some(column.column_schema.name.as_str())
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 let mut tag_columns = Vec::new();
180 let mut non_tag_indices = Vec::new();
181 {
182 let state = self.state.read().unwrap();
183 let physical_columns = state
184 .physical_region_states()
185 .get(&data_region_id)
186 .context(error::PhysicalRegionNotFoundSnafu {
187 region_id: data_region_id,
188 })?
189 .physical_columns();
190
191 for (index, field) in batch.schema().fields().iter().enumerate() {
192 let name = field.name();
193 let column_id =
194 *physical_columns
195 .get(name)
196 .with_context(|| error::ColumnNotFoundSnafu {
197 name: name.clone(),
198 region_id: logical_region_id,
199 })?;
200 if tag_names.contains(name.as_str()) {
201 tag_columns.push(TagColumnInfo {
202 name: name.clone(),
203 index,
204 column_id,
205 });
206 } else {
207 non_tag_indices.push(index);
208 }
209 }
210 }
211
212 tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
213 Ok((tag_columns, non_tag_indices))
214 }
215}
216
217fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
218 let schema_ref = batch.schema();
219 let fields = schema_ref.fields();
220
221 let mut ts_idx = None;
222 let mut val_idx = None;
223 let mut tag_indices = Vec::new();
224
225 for (idx, field) in fields.iter().enumerate() {
226 if field.name() == greptime_timestamp() {
227 ts_idx = Some(idx);
228 if !matches!(
229 field.data_type(),
230 datatypes::arrow::datatypes::DataType::Timestamp(
231 datatypes::arrow::datatypes::TimeUnit::Millisecond,
232 _
233 )
234 ) {
235 return error::UnexpectedRequestSnafu {
236 reason: format!(
237 "Timestamp column '{}' in region {:?} has incompatible type: {:?}",
238 field.name(),
239 logical_region_id,
240 field.data_type()
241 ),
242 }
243 .fail();
244 }
245 } else if field.name() == greptime_value() {
246 val_idx = Some(idx);
247 if !matches!(
248 field.data_type(),
249 datatypes::arrow::datatypes::DataType::Float64
250 ) {
251 return error::UnexpectedRequestSnafu {
252 reason: format!(
253 "Value column '{}' in region {:?} has incompatible type: {:?}",
254 field.name(),
255 logical_region_id,
256 field.data_type()
257 ),
258 }
259 .fail();
260 }
261 } else {
262 if !matches!(
263 field.data_type(),
264 datatypes::arrow::datatypes::DataType::Utf8
265 ) {
266 return error::UnexpectedRequestSnafu {
267 reason: format!(
268 "Tag column '{}' in region {:?} must be Utf8, found: {:?}",
269 field.name(),
270 logical_region_id,
271 field.data_type()
272 ),
273 }
274 .fail();
275 }
276 tag_indices.push(idx);
277 }
278 }
279
280 let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
281 reason: format!(
282 "Timestamp column '{}' not found in RecordBatch for region {:?}",
283 greptime_timestamp(),
284 logical_region_id
285 ),
286 })?;
287 let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
288 reason: format!(
289 "Value column '{}' not found in RecordBatch for region {:?}",
290 greptime_value(),
291 logical_region_id
292 ),
293 })?;
294
295 let mut schema = Vec::with_capacity(2 + tag_indices.len());
296 schema.push(api::v1::ColumnSchema {
297 column_name: greptime_timestamp().to_string(),
298 datatype: ColumnDataType::TimestampMillisecond as i32,
299 semantic_type: SemanticType::Timestamp as i32,
300 datatype_extension: None,
301 options: None,
302 });
303 schema.push(api::v1::ColumnSchema {
304 column_name: greptime_value().to_string(),
305 datatype: ColumnDataType::Float64 as i32,
306 semantic_type: SemanticType::Field as i32,
307 datatype_extension: None,
308 options: None,
309 });
310 for &idx in &tag_indices {
311 let field = &fields[idx];
312 schema.push(api::v1::ColumnSchema {
313 column_name: field.name().clone(),
314 datatype: ColumnDataType::String as i32,
315 semantic_type: SemanticType::Tag as i32,
316 datatype_extension: None,
317 options: None,
318 });
319 }
320
321 let ts_array = batch
322 .column(ts_idx)
323 .as_any()
324 .downcast_ref::<TimestampMillisecondArray>()
325 .expect("validated as TimestampMillisecond");
326 let val_array = batch
327 .column(val_idx)
328 .as_any()
329 .downcast_ref::<Float64Array>()
330 .expect("validated as Float64");
331 let tag_arrays: Vec<&StringArray> = tag_indices
332 .iter()
333 .map(|&idx| {
334 batch
335 .column(idx)
336 .as_any()
337 .downcast_ref::<StringArray>()
338 .expect("validated as Utf8")
339 })
340 .collect();
341
342 let num_rows = batch.num_rows();
343 let mut rows = Vec::with_capacity(num_rows);
344 for row_idx in 0..num_rows {
345 let mut values = Vec::with_capacity(2 + tag_arrays.len());
346
347 if ts_array.is_null(row_idx) {
348 values.push(api::v1::Value { value_data: None });
349 } else {
350 values.push(api::v1::Value {
351 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
352 ts_array.value(row_idx),
353 )),
354 });
355 }
356
357 if val_array.is_null(row_idx) {
358 values.push(api::v1::Value { value_data: None });
359 } else {
360 values.push(api::v1::Value {
361 value_data: Some(api::v1::value::ValueData::F64Value(
362 val_array.value(row_idx),
363 )),
364 });
365 }
366
367 for arr in &tag_arrays {
368 if arr.is_null(row_idx) {
369 values.push(api::v1::Value { value_data: None });
370 } else {
371 values.push(api::v1::Value {
372 value_data: Some(api::v1::value::ValueData::StringValue(
373 arr.value(row_idx).to_string(),
374 )),
375 });
376 }
377 }
378
379 rows.push(api::v1::Row { values });
380 }
381
382 Ok(api::v1::Rows { schema, rows })
383}
384
385fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
386 let mut encoder = FlightEncoder::default();
387 let schema = encoder.encode_schema(record_batch.schema().as_ref());
388 let mut iter = encoder
389 .encode(FlightMessage::RecordBatch(record_batch.clone()))
390 .into_iter();
391
392 let Some(flight_data) = iter.next() else {
393 return error::UnexpectedRequestSnafu {
394 reason: "Failed to encode empty flight data",
395 }
396 .fail();
397 };
398 ensure!(
399 iter.next().is_none(),
400 error::UnexpectedRequestSnafu {
401 reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
402 }
403 );
404
405 Ok((
406 schema.data_header,
407 flight_data.data_header,
408 flight_data.data_body,
409 ))
410}
411
412#[cfg(test)]
413mod tests {
414 use std::assert_matches;
415 use std::sync::Arc;
416
417 use api::v1::ArrowIpc;
418 use common_error::ext::ErrorExt;
419 use common_query::prelude::{greptime_timestamp, greptime_value};
420 use common_recordbatch::RecordBatches;
421 use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
422 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
423 use datatypes::arrow::record_batch::RecordBatch;
424 use mito2::config::MitoConfig;
425 use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
426 use store_api::path_utils::table_dir;
427 use store_api::region_engine::RegionEngine;
428 use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
429 use store_api::storage::{RegionId, ScanRequest};
430
431 use super::record_batch_to_ipc;
432 use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
433 use crate::error::Error;
434 use crate::test_util::{self, TestEnv};
435
436 fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
437 let schema = Arc::new(ArrowSchema::new(vec![
438 Field::new(
439 greptime_timestamp(),
440 DataType::Timestamp(TimeUnit::Millisecond, None),
441 false,
442 ),
443 Field::new(greptime_value(), DataType::Float64, true),
444 Field::new("job", DataType::Utf8, true),
445 ]));
446
447 let mut ts = Vec::with_capacity(rows);
448 let mut values = Vec::with_capacity(rows);
449 let mut tags = Vec::with_capacity(rows);
450 for i in start..start + rows {
451 ts.push(i as i64);
452 values.push(i as f64);
453 tags.push("tag_0".to_string());
454 }
455
456 RecordBatch::try_new(
457 schema,
458 vec![
459 Arc::new(TimestampMillisecondArray::from(ts)),
460 Arc::new(Float64Array::from(values)),
461 Arc::new(StringArray::from(tags)),
462 ],
463 )
464 .unwrap()
465 }
466
467 fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
468 let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
469 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
470 region_id: logical_region_id,
471 payload: batch,
472 raw_data: ArrowIpc {
473 schema,
474 data_header,
475 payload,
476 },
477 partition_expr_version: None,
478 })
479 }
480
481 async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
482 let physical_region_id = env.default_physical_region_id();
483 env.create_physical_region(
484 physical_region_id,
485 &TestEnv::default_table_dir(),
486 vec![(
487 MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
488 "dense".to_string(),
489 )],
490 )
491 .await;
492
493 let logical_region_id = env.default_logical_region_id();
494 let request = test_util::create_logical_region_request(
495 &["job"],
496 physical_region_id,
497 &table_dir("test", logical_region_id.table_id()),
498 );
499 env.metric()
500 .handle_request(logical_region_id, RegionRequest::Create(request))
501 .await
502 .unwrap();
503 logical_region_id
504 }
505
506 #[tokio::test]
507 async fn test_bulk_insert_empty_batch_returns_zero() {
508 let env = TestEnv::new().await;
509 env.init_metric_region().await;
510 let logical_region_id = env.default_logical_region_id();
511
512 let batch = build_logical_batch(0, 0);
513 let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
514 region_id: logical_region_id,
515 payload: batch,
516 raw_data: ArrowIpc::default(),
517 partition_expr_version: None,
518 });
519 let response = env
520 .metric()
521 .handle_request(logical_region_id, request)
522 .await
523 .unwrap();
524 assert_eq!(response.affected_rows, 0);
525 }
526
527 #[tokio::test]
528 async fn test_bulk_insert_physical_region_passthrough() {
529 let mito_config = MitoConfig {
531 default_flat_format: true,
532 ..Default::default()
533 };
534 let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
535 env.init_metric_region().await;
536 let physical_region_id = env.default_physical_region_id();
537 let logical_region_id = env.default_logical_region_id();
538
539 let logical_batch = build_logical_batch(0, 3);
541 let logical_request = build_bulk_request(logical_region_id, logical_batch.clone());
542 let response = env
543 .metric()
544 .handle_request(logical_region_id, logical_request)
545 .await
546 .unwrap();
547 assert_eq!(response.affected_rows, 3);
548
549 let tag_columns = vec![TagColumnInfo {
553 name: "job".to_string(),
554 index: 2,
555 column_id: 2, }];
557 let non_tag_indices = vec![0, 1]; let second_batch = build_logical_batch(3, 3);
559 let physical_batch = modify_batch_sparse(
560 second_batch,
561 logical_region_id.table_id(),
562 &tag_columns,
563 &non_tag_indices,
564 )
565 .unwrap();
566 let request = build_bulk_request(physical_region_id, physical_batch);
567 let response = env
568 .metric()
569 .handle_request(physical_region_id, request)
570 .await
571 .unwrap();
572 assert_eq!(response.affected_rows, 3);
573
574 let stream = env
576 .metric()
577 .scan_to_stream(logical_region_id, ScanRequest::default())
578 .await
579 .unwrap();
580 let batches = RecordBatches::try_collect(stream).await.unwrap();
581 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 6);
582 }
583
584 #[tokio::test]
585 async fn test_bulk_insert_physical_region_empty_batch() {
586 let mito_config = MitoConfig {
588 default_flat_format: true,
589 ..Default::default()
590 };
591 let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
592 env.init_metric_region().await;
593 let physical_region_id = env.default_physical_region_id();
594
595 let batch = build_logical_batch(0, 0);
596 let request = build_bulk_request(physical_region_id, batch);
597 let response = env
598 .metric()
599 .handle_request(physical_region_id, request)
600 .await
601 .unwrap();
602 assert_eq!(response.affected_rows, 0);
603 }
604
605 #[tokio::test]
606 async fn test_bulk_insert_unknown_column_errors() {
607 let env = TestEnv::new().await;
608 env.init_metric_region().await;
609 let logical_region_id = env.default_logical_region_id();
610
611 let schema = Arc::new(ArrowSchema::new(vec![
612 Field::new(
613 greptime_timestamp(),
614 DataType::Timestamp(TimeUnit::Millisecond, None),
615 false,
616 ),
617 Field::new(greptime_value(), DataType::Float64, true),
618 Field::new("nonexistent_column", DataType::Utf8, true),
619 ]));
620 let batch = RecordBatch::try_new(
621 schema,
622 vec![
623 Arc::new(TimestampMillisecondArray::from(vec![0i64])),
624 Arc::new(Float64Array::from(vec![1.0])),
625 Arc::new(StringArray::from(vec!["val"])),
626 ],
627 )
628 .unwrap();
629
630 let request = build_bulk_request(logical_region_id, batch);
631 let err = env
632 .metric()
633 .handle_request(logical_region_id, request)
634 .await
635 .unwrap_err();
636 let Some(err) = err.as_any().downcast_ref::<Error>() else {
637 panic!("unexpected error type");
638 };
639 assert_matches!(err, Error::ColumnNotFound { .. });
640 }
641
642 #[tokio::test]
643 async fn test_bulk_insert_multiple_tag_columns() {
644 let env = TestEnv::new().await;
645 let physical_region_id = env.default_physical_region_id();
646 env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
647 .await;
648 let logical_region_id = env.default_logical_region_id();
649 let request = test_util::create_logical_region_request(
650 &["host", "region"],
651 physical_region_id,
652 &table_dir("test", logical_region_id.table_id()),
653 );
654 env.metric()
655 .handle_request(logical_region_id, RegionRequest::Create(request))
656 .await
657 .unwrap();
658
659 let schema = Arc::new(ArrowSchema::new(vec![
660 Field::new(
661 greptime_timestamp(),
662 DataType::Timestamp(TimeUnit::Millisecond, None),
663 false,
664 ),
665 Field::new(greptime_value(), DataType::Float64, true),
666 Field::new("host", DataType::Utf8, true),
667 Field::new("region", DataType::Utf8, true),
668 ]));
669 let batch = RecordBatch::try_new(
670 schema,
671 vec![
672 Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
673 Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
674 Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
675 Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
676 ],
677 )
678 .unwrap();
679
680 let request = build_bulk_request(logical_region_id, batch);
681 let response = env
682 .metric()
683 .handle_request(logical_region_id, request)
684 .await
685 .unwrap();
686 assert_eq!(response.affected_rows, 3);
687
688 let stream = env
689 .metric()
690 .scan_to_stream(logical_region_id, ScanRequest::default())
691 .await
692 .unwrap();
693 let batches = RecordBatches::try_collect(stream).await.unwrap();
694 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
695 }
696
697 #[tokio::test]
698 async fn test_bulk_insert_accumulates_rows() {
699 let env = TestEnv::new().await;
700 env.init_metric_region().await;
701 let logical_region_id = env.default_logical_region_id();
702
703 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
704 let response = env
705 .metric()
706 .handle_request(logical_region_id, request)
707 .await
708 .unwrap();
709 assert_eq!(response.affected_rows, 3);
710
711 let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
712 let response = env
713 .metric()
714 .handle_request(logical_region_id, request)
715 .await
716 .unwrap();
717 assert_eq!(response.affected_rows, 5);
718
719 let stream = env
720 .metric()
721 .scan_to_stream(logical_region_id, ScanRequest::default())
722 .await
723 .unwrap();
724 let batches = RecordBatches::try_collect(stream).await.unwrap();
725 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
726 }
727
728 #[tokio::test]
729 async fn test_bulk_insert_sparse_encoding() {
730 let env = TestEnv::new().await;
731 env.init_metric_region().await;
732 let logical_region_id = env.default_logical_region_id();
733
734 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
735 let response = env
736 .metric()
737 .handle_request(logical_region_id, request)
738 .await
739 .unwrap();
740 assert_eq!(response.affected_rows, 4);
741
742 let stream = env
743 .metric()
744 .scan_to_stream(logical_region_id, ScanRequest::default())
745 .await
746 .unwrap();
747 let batches = RecordBatches::try_collect(stream).await.unwrap();
748 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
749 }
750
751 #[tokio::test]
752 async fn test_bulk_insert_dense_encoding_rejected() {
753 let env = TestEnv::new().await;
754 let logical_region_id = init_dense_metric_region(&env).await;
755
756 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
757 let err = env
758 .metric()
759 .handle_request(logical_region_id, request)
760 .await
761 .unwrap_err();
762 let Some(err) = err.as_any().downcast_ref::<Error>() else {
763 panic!("unexpected error type");
764 };
765 assert_matches!(err, Error::UnsupportedRegionRequest { .. });
766 }
767
768 #[tokio::test]
769 async fn test_bulk_insert_matches_put() {
770 let env_put = TestEnv::new().await;
771 env_put.init_metric_region().await;
772 let logical_region_id = env_put.default_logical_region_id();
773 let schema = test_util::row_schema_with_tags(&["job"]);
774 let rows = test_util::build_rows(1, 5);
775 env_put
776 .metric()
777 .handle_request(
778 logical_region_id,
779 RegionRequest::Put(RegionPutRequest {
780 rows: api::v1::Rows { schema, rows },
781 hint: None,
782 partition_expr_version: None,
783 }),
784 )
785 .await
786 .unwrap();
787 let put_stream = env_put
788 .metric()
789 .scan_to_stream(logical_region_id, ScanRequest::default())
790 .await
791 .unwrap();
792 let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
793 let put_output = put_batches.pretty_print().unwrap();
794
795 let env_bulk = TestEnv::new().await;
796 env_bulk.init_metric_region().await;
797 let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
798 env_bulk
799 .metric()
800 .handle_request(logical_region_id, request)
801 .await
802 .unwrap();
803 let bulk_stream = env_bulk
804 .metric()
805 .scan_to_stream(logical_region_id, ScanRequest::default())
806 .await
807 .unwrap();
808 let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
809 let bulk_output = bulk_batches.pretty_print().unwrap();
810
811 assert_eq!(put_output, bulk_output);
812 }
813
814 #[test]
815 fn test_record_batch_to_rows_with_null_values() {
816 use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
817 use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
818 use datatypes::arrow::record_batch::RecordBatch;
819 use store_api::storage::RegionId;
820
821 use crate::engine::bulk_insert::record_batch_to_rows;
822
823 let schema = Arc::new(ArrowSchema::new(vec![
824 Field::new(
825 greptime_timestamp(),
826 DataType::Timestamp(TimeUnit::Millisecond, None),
827 true,
828 ),
829 Field::new(greptime_value(), DataType::Float64, true),
830 Field::new("job", DataType::Utf8, true),
831 Field::new("host", DataType::Utf8, true),
832 ]));
833
834 let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
835 let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
836 let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
837 let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
838
839 let batch = RecordBatch::try_new(
840 schema,
841 vec![
842 Arc::new(ts_array),
843 Arc::new(val_array),
844 Arc::new(job_array),
845 Arc::new(host_array),
846 ],
847 )
848 .unwrap();
849
850 let region_id = RegionId::new(1, 1);
851 let rows = record_batch_to_rows(&batch, region_id).unwrap();
852
853 assert_eq!(rows.rows.len(), 3);
854 assert_eq!(rows.schema.len(), 4);
855
856 assert!(rows.rows[0].values[0].value_data.is_some());
858 assert!(rows.rows[0].values[1].value_data.is_some());
859 assert!(rows.rows[0].values[2].value_data.is_some());
860 assert!(rows.rows[0].values[3].value_data.is_none());
861
862 assert!(rows.rows[1].values[0].value_data.is_none());
864 assert!(rows.rows[1].values[1].value_data.is_some());
865 assert!(rows.rows[1].values[2].value_data.is_none());
866 assert!(rows.rows[1].values[3].value_data.is_some());
867
868 assert!(rows.rows[2].values[0].value_data.is_some());
870 assert!(rows.rows[2].values[1].value_data.is_none());
871 assert!(rows.rows[2].values[2].value_data.is_some());
872 assert!(rows.rows[2].values[3].value_data.is_some());
873 }
874}