1use std::collections::VecDeque;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Instant;
20
21use common_error::ext::BoxedError;
22use common_recordbatch::error::ExternalSnafu;
23use common_recordbatch::{DfRecordBatch, RecordBatch};
24use futures::stream::BoxStream;
25use futures::{Stream, StreamExt};
26use snafu::ResultExt;
27
28use crate::cache::CacheStrategy;
29use crate::error::Result;
30use crate::read::projection::ProjectionMapper;
31use crate::read::scan_util::PartitionMetrics;
32use crate::read::series_scan::SeriesBatch;
33
34pub enum ScanBatch {
36 Series(SeriesBatch),
37 RecordBatch(DfRecordBatch),
38}
39
40pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
41
42pub(crate) struct ConvertBatchStream {
44 inner: ScanBatchStream,
45 projection_mapper: Arc<ProjectionMapper>,
46 #[allow(dead_code)]
47 cache_strategy: CacheStrategy,
48 partition_metrics: PartitionMetrics,
49 pending: VecDeque<RecordBatch>,
50}
51
52impl ConvertBatchStream {
53 pub(crate) fn new(
54 inner: ScanBatchStream,
55 projection_mapper: Arc<ProjectionMapper>,
56 cache_strategy: CacheStrategy,
57 partition_metrics: PartitionMetrics,
58 ) -> Self {
59 Self {
60 inner,
61 projection_mapper,
62 cache_strategy,
63 partition_metrics,
64 pending: VecDeque::new(),
65 }
66 }
67
68 fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
69 match batch {
70 ScanBatch::Series(series) => {
71 debug_assert!(
72 self.pending.is_empty(),
73 "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
74 );
75
76 let SeriesBatch::Flat(flat_batch) = series;
77 let mapper = self.projection_mapper.as_flat().unwrap();
79
80 for batch in flat_batch.batches {
81 self.pending
82 .push_back(mapper.convert(&batch, &self.cache_strategy)?);
83 }
84
85 let output_schema = self.projection_mapper.output_schema();
86 Ok(self
87 .pending
88 .pop_front()
89 .unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
90 }
91 ScanBatch::RecordBatch(df_record_batch) => {
92 let mapper = self.projection_mapper.as_flat().unwrap();
94
95 mapper.convert(&df_record_batch, &self.cache_strategy)
96 }
97 }
98 }
99}
100
101impl Stream for ConvertBatchStream {
102 type Item = common_recordbatch::error::Result<RecordBatch>;
103
104 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 if let Some(batch) = self.pending.pop_front() {
106 return Poll::Ready(Some(Ok(batch)));
107 }
108
109 let batch = futures::ready!(self.inner.poll_next_unpin(cx));
110 let Some(batch) = batch else {
111 return Poll::Ready(None);
112 };
113
114 let record_batch = match batch {
115 Ok(batch) => {
116 let start = Instant::now();
117 let record_batch = self.convert(batch);
118 self.partition_metrics
119 .inc_convert_batch_cost(start.elapsed());
120 record_batch
121 }
122 Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
123 };
124 Poll::Ready(Some(record_batch))
125 }
126}