mito2/worker/
handle_bulk_insert.rs1use datatypes::arrow;
18use datatypes::arrow::array::{
19 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
20 TimestampSecondArray,
21};
22use datatypes::arrow::datatypes::{DataType, TimeUnit};
23use store_api::logstore::LogStore;
24use store_api::metadata::RegionMetadataRef;
25use store_api::region_request::RegionBulkInsertsRequest;
26
27use crate::memtable::bulk::part::BulkPart;
28use crate::request::{OptionOutputTx, SenderBulkRequest};
29use crate::worker::RegionWorkerLoop;
30use crate::{error, metrics};
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33 pub(crate) async fn handle_bulk_insert_batch(
34 &mut self,
35 region_metadata: RegionMetadataRef,
36 request: RegionBulkInsertsRequest,
37 pending_bulk_request: &mut Vec<SenderBulkRequest>,
38 sender: OptionOutputTx,
39 ) {
40 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
41 .with_label_values(&["process_bulk_req"])
42 .start_timer();
43 let batch = request.payload;
44 let num_rows = batch.num_rows();
45
46 let Some((ts_index, ts)) = batch
47 .schema()
48 .column_with_name(®ion_metadata.time_index_column().column_schema.name)
49 .map(|(index, _)| (index, batch.column(index)))
50 else {
51 sender.send(
52 error::InvalidRequestSnafu {
53 region_id: region_metadata.region_id,
54 reason: format!(
55 "timestamp column `{}` not found",
56 region_metadata.time_index_column().column_schema.name
57 ),
58 }
59 .fail(),
60 );
61 return;
62 };
63
64 let DataType::Timestamp(unit, _) = ts.data_type() else {
65 unreachable!()
67 };
68
69 let (min_ts, max_ts) = match unit {
70 TimeUnit::Second => {
71 let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
72 (
73 arrow::compute::min(ts).unwrap(),
75 arrow::compute::max(ts).unwrap(),
76 )
77 }
78
79 TimeUnit::Millisecond => {
80 let ts = ts
81 .as_any()
82 .downcast_ref::<TimestampMillisecondArray>()
83 .unwrap();
84 (
85 arrow::compute::min(ts).unwrap(),
87 arrow::compute::max(ts).unwrap(),
88 )
89 }
90 TimeUnit::Microsecond => {
91 let ts = ts
92 .as_any()
93 .downcast_ref::<TimestampMicrosecondArray>()
94 .unwrap();
95 (
96 arrow::compute::min(ts).unwrap(),
98 arrow::compute::max(ts).unwrap(),
99 )
100 }
101 TimeUnit::Nanosecond => {
102 let ts = ts
103 .as_any()
104 .downcast_ref::<TimestampNanosecondArray>()
105 .unwrap();
106 (
107 arrow::compute::min(ts).unwrap(),
109 arrow::compute::max(ts).unwrap(),
110 )
111 }
112 };
113
114 let part = BulkPart {
115 batch,
116 num_rows,
117 max_ts,
118 min_ts,
119 sequence: 0,
120 timestamp_index: ts_index,
121 };
122 pending_bulk_request.push(SenderBulkRequest {
123 sender,
124 request: part,
125 region_id: request.region_id,
126 region_metadata,
127 });
128 }
129}