mito2/read/
stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use std::time::Instant;
19
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::compute;
24use futures::stream::BoxStream;
25use futures::{Stream, StreamExt};
26use snafu::ResultExt;
27
28use crate::cache::CacheStrategy;
29use crate::error::Result;
30use crate::read::Batch;
31use crate::read::projection::ProjectionMapper;
32use crate::read::scan_util::PartitionMetrics;
33use crate::read::series_scan::SeriesBatch;
34
35/// All kinds of [`Batch`]es to produce in scanner.
36pub enum ScanBatch {
37    Normal(Batch),
38    Series(SeriesBatch),
39    RecordBatch(DfRecordBatch),
40}
41
42pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
43
44/// A stream that takes [`ScanBatch`]es and produces (converts them to) [`RecordBatch`]es.
45pub(crate) struct ConvertBatchStream {
46    inner: ScanBatchStream,
47    projection_mapper: Arc<ProjectionMapper>,
48    cache_strategy: CacheStrategy,
49    partition_metrics: PartitionMetrics,
50    buffer: Vec<DfRecordBatch>,
51}
52
53impl ConvertBatchStream {
54    pub(crate) fn new(
55        inner: ScanBatchStream,
56        projection_mapper: Arc<ProjectionMapper>,
57        cache_strategy: CacheStrategy,
58        partition_metrics: PartitionMetrics,
59    ) -> Self {
60        Self {
61            inner,
62            projection_mapper,
63            cache_strategy,
64            partition_metrics,
65            buffer: Vec::new(),
66        }
67    }
68
69    fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
70        match batch {
71            ScanBatch::Normal(batch) => {
72                // Safety: Only primary key format returns this batch.
73                let mapper = self.projection_mapper.as_primary_key().unwrap();
74
75                if batch.is_empty() {
76                    Ok(mapper.empty_record_batch())
77                } else {
78                    mapper.convert(&batch, &self.cache_strategy)
79                }
80            }
81            ScanBatch::Series(series) => {
82                self.buffer.clear();
83
84                match series {
85                    SeriesBatch::PrimaryKey(primary_key_batch) => {
86                        self.buffer.reserve(primary_key_batch.batches.len());
87                        // Safety: Only primary key format returns this batch.
88                        let mapper = self.projection_mapper.as_primary_key().unwrap();
89
90                        for batch in primary_key_batch.batches {
91                            let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
92                            self.buffer.push(record_batch.into_df_record_batch());
93                        }
94                    }
95                    SeriesBatch::Flat(flat_batch) => {
96                        self.buffer.reserve(flat_batch.batches.len());
97                        // Safety: Only flat format returns this batch.
98                        let mapper = self.projection_mapper.as_flat().unwrap();
99
100                        for batch in flat_batch.batches {
101                            let record_batch = mapper.convert(&batch)?;
102                            self.buffer.push(record_batch.into_df_record_batch());
103                        }
104                    }
105                }
106
107                let output_schema = self.projection_mapper.output_schema();
108                let record_batch =
109                    compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
110                        .context(ArrowComputeSnafu)?;
111
112                RecordBatch::try_from_df_record_batch(output_schema, record_batch)
113            }
114            ScanBatch::RecordBatch(df_record_batch) => {
115                // Safety: Only flat format returns this batch.
116                let mapper = self.projection_mapper.as_flat().unwrap();
117
118                mapper.convert(&df_record_batch)
119            }
120        }
121    }
122}
123
124impl Stream for ConvertBatchStream {
125    type Item = common_recordbatch::error::Result<RecordBatch>;
126
127    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128        let batch = futures::ready!(self.inner.poll_next_unpin(cx));
129        let Some(batch) = batch else {
130            return Poll::Ready(None);
131        };
132
133        let record_batch = match batch {
134            Ok(batch) => {
135                let start = Instant::now();
136                let record_batch = self.convert(batch);
137                self.partition_metrics
138                    .inc_convert_batch_cost(start.elapsed());
139                record_batch
140            }
141            Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
142        };
143        Poll::Ready(Some(record_batch))
144    }
145}