mito2/worker/
handle_bulk_insert.rs1use datatypes::arrow;
18use datatypes::arrow::array::{
19 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
20 TimestampSecondArray,
21};
22use datatypes::arrow::datatypes::{DataType, TimeUnit};
23use store_api::logstore::LogStore;
24use store_api::metadata::RegionMetadataRef;
25use store_api::region_request::RegionBulkInsertsRequest;
26
27use crate::memtable::bulk::part::BulkPart;
28use crate::request::{OptionOutputTx, SenderBulkRequest};
29use crate::worker::RegionWorkerLoop;
30use crate::{error, metrics};
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33 pub(crate) async fn handle_bulk_insert_batch(
34 &mut self,
35 region_metadata: RegionMetadataRef,
36 request: RegionBulkInsertsRequest,
37 pending_bulk_request: &mut Vec<SenderBulkRequest>,
38 sender: OptionOutputTx,
39 ) {
40 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
41 .with_label_values(&["process_bulk_req"])
42 .start_timer();
43 let batch = request.payload;
44
45 let Some((ts_index, ts)) = batch
46 .schema()
47 .column_with_name(®ion_metadata.time_index_column().column_schema.name)
48 .map(|(index, _)| (index, batch.column(index)))
49 else {
50 sender.send(
51 error::InvalidRequestSnafu {
52 region_id: region_metadata.region_id,
53 reason: format!(
54 "timestamp column `{}` not found",
55 region_metadata.time_index_column().column_schema.name
56 ),
57 }
58 .fail(),
59 );
60 return;
61 };
62
63 let DataType::Timestamp(unit, _) = ts.data_type() else {
64 unreachable!()
66 };
67
68 let (min_ts, max_ts) = match unit {
69 TimeUnit::Second => {
70 let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
71 (
72 arrow::compute::min(ts).unwrap(),
74 arrow::compute::max(ts).unwrap(),
75 )
76 }
77
78 TimeUnit::Millisecond => {
79 let ts = ts
80 .as_any()
81 .downcast_ref::<TimestampMillisecondArray>()
82 .unwrap();
83 (
84 arrow::compute::min(ts).unwrap(),
86 arrow::compute::max(ts).unwrap(),
87 )
88 }
89 TimeUnit::Microsecond => {
90 let ts = ts
91 .as_any()
92 .downcast_ref::<TimestampMicrosecondArray>()
93 .unwrap();
94 (
95 arrow::compute::min(ts).unwrap(),
97 arrow::compute::max(ts).unwrap(),
98 )
99 }
100 TimeUnit::Nanosecond => {
101 let ts = ts
102 .as_any()
103 .downcast_ref::<TimestampNanosecondArray>()
104 .unwrap();
105 (
106 arrow::compute::min(ts).unwrap(),
108 arrow::compute::max(ts).unwrap(),
109 )
110 }
111 };
112
113 let part = BulkPart {
114 batch,
115 max_ts,
116 min_ts,
117 sequence: 0,
118 timestamp_index: ts_index,
119 raw_data: Some(request.raw_data),
120 };
121 pending_bulk_request.push(SenderBulkRequest {
122 sender,
123 request: part,
124 region_id: request.region_id,
125 region_metadata,
126 });
127 }
128}