1use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use std::time::Instant;
19
20use common_error::ext::BoxedError;
21use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
22use common_recordbatch::{DfRecordBatch, RecordBatch};
23use datatypes::compute;
24use futures::stream::BoxStream;
25use futures::{Stream, StreamExt};
26use snafu::ResultExt;
27
28use crate::cache::CacheStrategy;
29use crate::error::Result;
30use crate::read::projection::ProjectionMapper;
31use crate::read::scan_util::PartitionMetrics;
32use crate::read::series_scan::SeriesBatch;
33use crate::read::Batch;
34
35pub enum ScanBatch {
37 Normal(Batch),
38 Series(SeriesBatch),
39}
40
41pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
42
43pub(crate) struct ConvertBatchStream {
45 inner: ScanBatchStream,
46 projection_mapper: Arc<ProjectionMapper>,
47 cache_strategy: CacheStrategy,
48 partition_metrics: PartitionMetrics,
49 buffer: Vec<DfRecordBatch>,
50}
51
52impl ConvertBatchStream {
53 pub(crate) fn new(
54 inner: ScanBatchStream,
55 projection_mapper: Arc<ProjectionMapper>,
56 cache_strategy: CacheStrategy,
57 partition_metrics: PartitionMetrics,
58 ) -> Self {
59 Self {
60 inner,
61 projection_mapper,
62 cache_strategy,
63 partition_metrics,
64 buffer: Vec::new(),
65 }
66 }
67
68 fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
69 match batch {
70 ScanBatch::Normal(batch) => {
71 if batch.is_empty() {
72 Ok(self.projection_mapper.empty_record_batch())
73 } else {
74 self.projection_mapper.convert(&batch, &self.cache_strategy)
75 }
76 }
77 ScanBatch::Series(series) => {
78 self.buffer.clear();
79 self.buffer.reserve(series.batches.len());
80
81 for batch in series.batches {
82 let record_batch = self
83 .projection_mapper
84 .convert(&batch, &self.cache_strategy)?;
85 self.buffer.push(record_batch.into_df_record_batch());
86 }
87
88 let output_schema = self.projection_mapper.output_schema();
89 let record_batch =
90 compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
91 .context(ArrowComputeSnafu)?;
92
93 RecordBatch::try_from_df_record_batch(output_schema, record_batch)
94 }
95 }
96 }
97}
98
99impl Stream for ConvertBatchStream {
100 type Item = common_recordbatch::error::Result<RecordBatch>;
101
102 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
103 let batch = futures::ready!(self.inner.poll_next_unpin(cx));
104 let Some(batch) = batch else {
105 return Poll::Ready(None);
106 };
107
108 let record_batch = match batch {
109 Ok(batch) => {
110 let start = Instant::now();
111 let record_batch = self.convert(batch);
112 self.partition_metrics
113 .inc_convert_batch_cost(start.elapsed());
114 record_batch
115 }
116 Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
117 };
118 Poll::Ready(Some(record_batch))
119 }
120}