1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use std::time::Instant;
19
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::compute;
24use futures::stream::BoxStream;
25use futures::{Stream, StreamExt};
26use snafu::{OptionExt, ResultExt};
27
28use crate::cache::CacheStrategy;
29use crate::error::{Result, UnexpectedSnafu};
30use crate::read::projection::ProjectionMapper;
31use crate::read::scan_util::PartitionMetrics;
32use crate::read::series_scan::SeriesBatch;
33use crate::read::Batch;
34
35pub enum ScanBatch {
37 Normal(Batch),
38 Series(SeriesBatch),
39 RecordBatch(DfRecordBatch),
40}
41
42pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
43
44pub(crate) struct ConvertBatchStream {
46 inner: ScanBatchStream,
47 projection_mapper: Arc<ProjectionMapper>,
48 cache_strategy: CacheStrategy,
49 partition_metrics: PartitionMetrics,
50 buffer: Vec<DfRecordBatch>,
51}
52
53impl ConvertBatchStream {
54 pub(crate) fn new(
55 inner: ScanBatchStream,
56 projection_mapper: Arc<ProjectionMapper>,
57 cache_strategy: CacheStrategy,
58 partition_metrics: PartitionMetrics,
59 ) -> Self {
60 Self {
61 inner,
62 projection_mapper,
63 cache_strategy,
64 partition_metrics,
65 buffer: Vec::new(),
66 }
67 }
68
69 fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
70 let mapper = self
71 .projection_mapper
72 .as_primary_key()
73 .context(UnexpectedSnafu {
74 reason: "Unexpected format",
75 })
76 .map_err(|e| BoxedError::new(e) as _)
77 .context(ExternalSnafu)?;
78
79 match batch {
80 ScanBatch::Normal(batch) => {
81 if batch.is_empty() {
82 Ok(mapper.empty_record_batch())
83 } else {
84 mapper.convert(&batch, &self.cache_strategy)
85 }
86 }
87 ScanBatch::Series(series) => {
88 self.buffer.clear();
89 self.buffer.reserve(series.batches.len());
90
91 for batch in series.batches {
92 let record_batch = mapper.convert(&batch, &self.cache_strategy)?;
93 self.buffer.push(record_batch.into_df_record_batch());
94 }
95
96 let output_schema = mapper.output_schema();
97 let record_batch =
98 compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
99 .context(ArrowComputeSnafu)?;
100
101 RecordBatch::try_from_df_record_batch(output_schema, record_batch)
102 }
103 ScanBatch::RecordBatch(df_record_batch) => {
104 let mapper = self.projection_mapper.as_flat().unwrap();
106
107 mapper.convert(&df_record_batch)
108 }
109 }
110 }
111}
112
113impl Stream for ConvertBatchStream {
114 type Item = common_recordbatch::error::Result<RecordBatch>;
115
116 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
117 let batch = futures::ready!(self.inner.poll_next_unpin(cx));
118 let Some(batch) = batch else {
119 return Poll::Ready(None);
120 };
121
122 let record_batch = match batch {
123 Ok(batch) => {
124 let start = Instant::now();
125 let record_batch = self.convert(batch);
126 self.partition_metrics
127 .inc_convert_batch_cost(start.elapsed());
128 record_batch
129 }
130 Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
131 };
132 Poll::Ready(Some(record_batch))
133 }
134}