mito2/worker/
handle_bulk_insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handles bulk insert requests.
16
17use datatypes::arrow;
18use store_api::logstore::LogStore;
19use store_api::metadata::RegionMetadataRef;
20use store_api::region_request::RegionBulkInsertsRequest;
21
22use crate::error::InconsistentTimestampLengthSnafu;
23use crate::memtable::bulk::part::BulkPart;
24use crate::request::{OptionOutputTx, SenderBulkRequest};
25use crate::worker::RegionWorkerLoop;
26use crate::{error, metrics};
27
28impl<S: LogStore> RegionWorkerLoop<S> {
29    pub(crate) async fn handle_bulk_insert_batch(
30        &mut self,
31        region_metadata: RegionMetadataRef,
32        request: RegionBulkInsertsRequest,
33        pending_bulk_request: &mut Vec<SenderBulkRequest>,
34        sender: OptionOutputTx,
35    ) {
36        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
37            .with_label_values(&["process_bulk_req"])
38            .start_timer();
39        let batch = request.payload;
40        if batch.num_rows() == 0 {
41            sender.send(Ok(0));
42            return;
43        }
44
45        let Some((ts_index, ts)) = batch
46            .schema()
47            .column_with_name(&region_metadata.time_index_column().column_schema.name)
48            .map(|(index, _)| (index, batch.column(index)))
49        else {
50            sender.send(
51                error::InvalidRequestSnafu {
52                    region_id: region_metadata.region_id,
53                    reason: format!(
54                        "timestamp column `{}` not found",
55                        region_metadata.time_index_column().column_schema.name
56                    ),
57                }
58                .fail(),
59            );
60            return;
61        };
62
63        if batch.num_rows() != ts.len() {
64            sender.send(
65                InconsistentTimestampLengthSnafu {
66                    expected: batch.num_rows(),
67                    actual: ts.len(),
68                }
69                .fail(),
70            );
71            return;
72        }
73
74        // safety: ts data type must be a timestamp type.
75        let (ts_primitive, _) = datatypes::timestamp::timestamp_array_to_primitive(ts).unwrap();
76
77        // safety: we've checked ts.len() == batch.num_rows() and batch is not empty
78        let min_ts = arrow::compute::min(&ts_primitive).unwrap();
79        let max_ts = arrow::compute::max(&ts_primitive).unwrap();
80
81        let part = BulkPart {
82            batch,
83            max_ts,
84            min_ts,
85            sequence: 0,
86            timestamp_index: ts_index,
87            raw_data: Some(request.raw_data),
88        };
89        pending_bulk_request.push(SenderBulkRequest {
90            sender,
91            request: part,
92            region_id: request.region_id,
93            region_metadata,
94        });
95    }
96}