mito2/worker/
handle_bulk_insert.rs1use datatypes::arrow;
18use store_api::logstore::LogStore;
19use store_api::metadata::RegionMetadataRef;
20use store_api::region_request::RegionBulkInsertsRequest;
21
22use crate::error::InconsistentTimestampLengthSnafu;
23use crate::memtable::bulk::part::BulkPart;
24use crate::request::{OptionOutputTx, SenderBulkRequest};
25use crate::worker::RegionWorkerLoop;
26use crate::{error, metrics};
27
28impl<S: LogStore> RegionWorkerLoop<S> {
29 pub(crate) async fn handle_bulk_insert_batch(
30 &mut self,
31 region_metadata: RegionMetadataRef,
32 request: RegionBulkInsertsRequest,
33 pending_bulk_request: &mut Vec<SenderBulkRequest>,
34 sender: OptionOutputTx,
35 ) {
36 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
37 .with_label_values(&["process_bulk_req"])
38 .start_timer();
39 let batch = request.payload;
40 if batch.num_rows() == 0 {
41 sender.send(Ok(0));
42 return;
43 }
44
45 let Some((ts_index, ts)) = batch
46 .schema()
47 .column_with_name(®ion_metadata.time_index_column().column_schema.name)
48 .map(|(index, _)| (index, batch.column(index)))
49 else {
50 sender.send(
51 error::InvalidRequestSnafu {
52 region_id: region_metadata.region_id,
53 reason: format!(
54 "timestamp column `{}` not found",
55 region_metadata.time_index_column().column_schema.name
56 ),
57 }
58 .fail(),
59 );
60 return;
61 };
62
63 if batch.num_rows() != ts.len() {
64 sender.send(
65 InconsistentTimestampLengthSnafu {
66 expected: batch.num_rows(),
67 actual: ts.len(),
68 }
69 .fail(),
70 );
71 return;
72 }
73
74 let (ts_primitive, _) = datatypes::timestamp::timestamp_array_to_primitive(ts).unwrap();
76
77 let min_ts = arrow::compute::min(&ts_primitive).unwrap();
79 let max_ts = arrow::compute::max(&ts_primitive).unwrap();
80
81 let part = BulkPart {
82 batch,
83 max_ts,
84 min_ts,
85 sequence: 0,
86 timestamp_index: ts_index,
87 raw_data: Some(request.raw_data),
88 };
89 pending_bulk_request.push(SenderBulkRequest {
90 sender,
91 request: part,
92 region_id: request.region_id,
93 region_metadata,
94 });
95 }
96}