1use std::collections::VecDeque;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Instant;
20
21use common_error::ext::BoxedError;
22use common_recordbatch::error::ExternalSnafu;
23use common_recordbatch::{DfRecordBatch, RecordBatch};
24use futures::stream::BoxStream;
25use futures::{Stream, StreamExt};
26use snafu::ResultExt;
27
28use crate::cache::CacheStrategy;
29use crate::error::Result;
30use crate::read::Batch;
31use crate::read::projection::ProjectionMapper;
32use crate::read::scan_util::PartitionMetrics;
33use crate::read::series_scan::SeriesBatch;
34
35pub enum ScanBatch {
37 Normal(Batch),
38 Series(SeriesBatch),
39 RecordBatch(DfRecordBatch),
40}
41
42pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
43
44pub(crate) struct ConvertBatchStream {
46 inner: ScanBatchStream,
47 projection_mapper: Arc<ProjectionMapper>,
48 cache_strategy: CacheStrategy,
49 partition_metrics: PartitionMetrics,
50 pending: VecDeque<RecordBatch>,
51}
52
53impl ConvertBatchStream {
54 pub(crate) fn new(
55 inner: ScanBatchStream,
56 projection_mapper: Arc<ProjectionMapper>,
57 cache_strategy: CacheStrategy,
58 partition_metrics: PartitionMetrics,
59 ) -> Self {
60 Self {
61 inner,
62 projection_mapper,
63 cache_strategy,
64 partition_metrics,
65 pending: VecDeque::new(),
66 }
67 }
68
69 fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
70 match batch {
71 ScanBatch::Normal(batch) => {
72 let mapper = self.projection_mapper.as_primary_key().unwrap();
74
75 if batch.is_empty() {
76 Ok(mapper.empty_record_batch())
77 } else {
78 mapper.convert(&batch, &self.cache_strategy)
79 }
80 }
81 ScanBatch::Series(series) => {
82 debug_assert!(
83 self.pending.is_empty(),
84 "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist"
85 );
86
87 match series {
88 SeriesBatch::PrimaryKey(primary_key_batch) => {
89 let mapper = self.projection_mapper.as_primary_key().unwrap();
91
92 for batch in primary_key_batch.batches {
93 self.pending
94 .push_back(mapper.convert(&batch, &self.cache_strategy)?);
95 }
96 }
97 SeriesBatch::Flat(flat_batch) => {
98 let mapper = self.projection_mapper.as_flat().unwrap();
100
101 for batch in flat_batch.batches {
102 self.pending
103 .push_back(mapper.convert(&batch, &self.cache_strategy)?);
104 }
105 }
106 }
107
108 let output_schema = self.projection_mapper.output_schema();
109 Ok(self
110 .pending
111 .pop_front()
112 .unwrap_or_else(|| RecordBatch::new_empty(output_schema)))
113 }
114 ScanBatch::RecordBatch(df_record_batch) => {
115 let mapper = self.projection_mapper.as_flat().unwrap();
117
118 mapper.convert(&df_record_batch, &self.cache_strategy)
119 }
120 }
121 }
122}
123
124impl Stream for ConvertBatchStream {
125 type Item = common_recordbatch::error::Result<RecordBatch>;
126
127 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128 if let Some(batch) = self.pending.pop_front() {
129 return Poll::Ready(Some(Ok(batch)));
130 }
131
132 let batch = futures::ready!(self.inner.poll_next_unpin(cx));
133 let Some(batch) = batch else {
134 return Poll::Ready(None);
135 };
136
137 let record_batch = match batch {
138 Ok(batch) => {
139 let start = Instant::now();
140 let record_batch = self.convert(batch);
141 self.partition_metrics
142 .inc_convert_batch_cost(start.elapsed());
143 record_batch
144 }
145 Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
146 };
147 Poll::Ready(Some(record_batch))
148 }
149}