mito2/worker/
handle_bulk_insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles bulk insert requests.
16
17use datatypes::arrow;
18use datatypes::arrow::array::{
19    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
20    TimestampSecondArray,
21};
22use datatypes::arrow::datatypes::{DataType, TimeUnit};
23use store_api::logstore::LogStore;
24use store_api::metadata::RegionMetadataRef;
25use store_api::region_request::RegionBulkInsertsRequest;
26
27use crate::memtable::bulk::part::BulkPart;
28use crate::request::{OptionOutputTx, SenderBulkRequest};
29use crate::worker::RegionWorkerLoop;
30use crate::{error, metrics};
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33    pub(crate) async fn handle_bulk_insert_batch(
34        &mut self,
35        region_metadata: RegionMetadataRef,
36        request: RegionBulkInsertsRequest,
37        pending_bulk_request: &mut Vec<SenderBulkRequest>,
38        sender: OptionOutputTx,
39    ) {
40        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
41            .with_label_values(&["process_bulk_req"])
42            .start_timer();
43        let batch = request.payload;
44        let num_rows = batch.num_rows();
45
46        let Some((ts_index, ts)) = batch
47            .schema()
48            .column_with_name(&region_metadata.time_index_column().column_schema.name)
49            .map(|(index, _)| (index, batch.column(index)))
50        else {
51            sender.send(
52                error::InvalidRequestSnafu {
53                    region_id: region_metadata.region_id,
54                    reason: format!(
55                        "timestamp column `{}` not found",
56                        region_metadata.time_index_column().column_schema.name
57                    ),
58                }
59                .fail(),
60            );
61            return;
62        };
63
64        let DataType::Timestamp(unit, _) = ts.data_type() else {
65            // safety: ts data type must be a timestamp type.
66            unreachable!()
67        };
68
69        let (min_ts, max_ts) = match unit {
70            TimeUnit::Second => {
71                let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
72                (
73                    //safety: ts array must contain at least one row so this won't return None.
74                    arrow::compute::min(ts).unwrap(),
75                    arrow::compute::max(ts).unwrap(),
76                )
77            }
78
79            TimeUnit::Millisecond => {
80                let ts = ts
81                    .as_any()
82                    .downcast_ref::<TimestampMillisecondArray>()
83                    .unwrap();
84                (
85                    //safety: ts array must contain at least one row so this won't return None.
86                    arrow::compute::min(ts).unwrap(),
87                    arrow::compute::max(ts).unwrap(),
88                )
89            }
90            TimeUnit::Microsecond => {
91                let ts = ts
92                    .as_any()
93                    .downcast_ref::<TimestampMicrosecondArray>()
94                    .unwrap();
95                (
96                    //safety: ts array must contain at least one row so this won't return None.
97                    arrow::compute::min(ts).unwrap(),
98                    arrow::compute::max(ts).unwrap(),
99                )
100            }
101            TimeUnit::Nanosecond => {
102                let ts = ts
103                    .as_any()
104                    .downcast_ref::<TimestampNanosecondArray>()
105                    .unwrap();
106                (
107                    //safety: ts array must contain at least one row so this won't return None.
108                    arrow::compute::min(ts).unwrap(),
109                    arrow::compute::max(ts).unwrap(),
110                )
111            }
112        };
113
114        let part = BulkPart {
115            batch,
116            num_rows,
117            max_ts,
118            min_ts,
119            sequence: 0,
120            timestamp_index: ts_index,
121        };
122        pending_bulk_request.push(SenderBulkRequest {
123            sender,
124            request: part,
125            region_id: request.region_id,
126            region_metadata,
127        });
128    }
129}