mito2/worker/
handle_bulk_insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles bulk insert requests.
16
17use datatypes::arrow;
18use datatypes::arrow::array::{
19    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
20    TimestampSecondArray,
21};
22use datatypes::arrow::datatypes::{DataType, TimeUnit};
23use store_api::logstore::LogStore;
24use store_api::metadata::RegionMetadataRef;
25use store_api::region_request::RegionBulkInsertsRequest;
26
27use crate::memtable::bulk::part::BulkPart;
28use crate::request::{OptionOutputTx, SenderBulkRequest};
29use crate::worker::RegionWorkerLoop;
30use crate::{error, metrics};
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33    pub(crate) async fn handle_bulk_insert_batch(
34        &mut self,
35        region_metadata: RegionMetadataRef,
36        request: RegionBulkInsertsRequest,
37        pending_bulk_request: &mut Vec<SenderBulkRequest>,
38        sender: OptionOutputTx,
39    ) {
40        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
41            .with_label_values(&["process_bulk_req"])
42            .start_timer();
43        let batch = request.payload;
44
45        let Some((ts_index, ts)) = batch
46            .schema()
47            .column_with_name(&region_metadata.time_index_column().column_schema.name)
48            .map(|(index, _)| (index, batch.column(index)))
49        else {
50            sender.send(
51                error::InvalidRequestSnafu {
52                    region_id: region_metadata.region_id,
53                    reason: format!(
54                        "timestamp column `{}` not found",
55                        region_metadata.time_index_column().column_schema.name
56                    ),
57                }
58                .fail(),
59            );
60            return;
61        };
62
63        let DataType::Timestamp(unit, _) = ts.data_type() else {
64            // safety: ts data type must be a timestamp type.
65            unreachable!()
66        };
67
68        let (min_ts, max_ts) = match unit {
69            TimeUnit::Second => {
70                let ts = ts.as_any().downcast_ref::<TimestampSecondArray>().unwrap();
71                (
72                    //safety: ts array must contain at least one row so this won't return None.
73                    arrow::compute::min(ts).unwrap(),
74                    arrow::compute::max(ts).unwrap(),
75                )
76            }
77
78            TimeUnit::Millisecond => {
79                let ts = ts
80                    .as_any()
81                    .downcast_ref::<TimestampMillisecondArray>()
82                    .unwrap();
83                (
84                    //safety: ts array must contain at least one row so this won't return None.
85                    arrow::compute::min(ts).unwrap(),
86                    arrow::compute::max(ts).unwrap(),
87                )
88            }
89            TimeUnit::Microsecond => {
90                let ts = ts
91                    .as_any()
92                    .downcast_ref::<TimestampMicrosecondArray>()
93                    .unwrap();
94                (
95                    //safety: ts array must contain at least one row so this won't return None.
96                    arrow::compute::min(ts).unwrap(),
97                    arrow::compute::max(ts).unwrap(),
98                )
99            }
100            TimeUnit::Nanosecond => {
101                let ts = ts
102                    .as_any()
103                    .downcast_ref::<TimestampNanosecondArray>()
104                    .unwrap();
105                (
106                    //safety: ts array must contain at least one row so this won't return None.
107                    arrow::compute::min(ts).unwrap(),
108                    arrow::compute::max(ts).unwrap(),
109                )
110            }
111        };
112
113        let part = BulkPart {
114            batch,
115            max_ts,
116            min_ts,
117            sequence: 0,
118            timestamp_index: ts_index,
119            raw_data: Some(request.raw_data),
120        };
121        pending_bulk_request.push(SenderBulkRequest {
122            sender,
123            request: part,
124            region_id: request.region_id,
125            region_metadata,
126        });
127    }
128}