Skip to main content

mito2/read/
stream.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Instant;
20
21use common_error::ext::BoxedError;
22use common_recordbatch::error::ExternalSnafu;
23use common_recordbatch::{DfRecordBatch, RecordBatch};
24use futures::stream::BoxStream;
25use futures::{Stream, StreamExt};
26use snafu::ResultExt;
27
28use crate::cache::CacheStrategy;
29use crate::error::Result;
30use crate::read::projection::ProjectionMapper;
31use crate::read::scan_util::PartitionMetrics;
32use crate::read::series_scan::SeriesBatch;
33
34/// All kinds of [`Batch`]es to produce in scanner.
35pub enum ScanBatch {
36    Series(SeriesBatch),
37    RecordBatch(DfRecordBatch),
38}
39
40pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
41
42/// A stream that takes [`ScanBatch`]es and produces (converts them to) [`RecordBatch`]es.
43pub(crate) struct ConvertBatchStream {
44    inner: ScanBatchStream,
45    projection_mapper: Arc<ProjectionMapper>,
46    #[allow(dead_code)]
47    cache_strategy: CacheStrategy,
48    partition_metrics: PartitionMetrics,
49    pending: VecDeque<RecordBatch>,
50}
51
52impl ConvertBatchStream {
53    pub(crate) fn new(
54        inner: ScanBatchStream,
55        projection_mapper: Arc<ProjectionMapper>,
56        cache_strategy: CacheStrategy,
57        partition_metrics: PartitionMetrics,
58    ) -> Self {
59        Self {
60            inner,
61            projection_mapper,
62            cache_strategy,
63            partition_metrics,
64            pending: VecDeque::new(),
65        }
66    }
67
68    fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
69        match batch {
70            ScanBatch::Series(series) => {
71                debug_assert!(
72                    self.pending.is_empty(),
73                    "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
74                );
75
76                let SeriesBatch::Flat(flat_batch) = series;
77                // Safety: Only flat format returns this batch.
78                let mapper = self.projection_mapper.as_flat().unwrap();
79
80                for batch in flat_batch.batches {
81                    self.pending
82                        .push_back(mapper.convert(&batch, &self.cache_strategy)?);
83                }
84
85                let output_schema = self.projection_mapper.output_schema();
86                Ok(self
87                    .pending
88                    .pop_front()
89                    .unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
90            }
91            ScanBatch::RecordBatch(df_record_batch) => {
92                // Safety: Only flat format returns this batch.
93                let mapper = self.projection_mapper.as_flat().unwrap();
94
95                mapper.convert(&df_record_batch, &self.cache_strategy)
96            }
97        }
98    }
99}
100
101impl Stream for ConvertBatchStream {
102    type Item = common_recordbatch::error::Result<RecordBatch>;
103
104    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105        if let Some(batch) = self.pending.pop_front() {
106            return Poll::Ready(Some(Ok(batch)));
107        }
108
109        let batch = futures::ready!(self.inner.poll_next_unpin(cx));
110        let Some(batch) = batch else {
111            return Poll::Ready(None);
112        };
113
114        let record_batch = match batch {
115            Ok(batch) => {
116                let start = Instant::now();
117                let record_batch = self.convert(batch);
118                self.partition_metrics
119                    .inc_convert_batch_cost(start.elapsed());
120                record_batch
121            }
122            Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
123        };
124        Poll::Ready(Some(record_batch))
125    }
126}