1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
26use datatypes::schema::SchemaRef;
27use futures::{Stream, StreamExt};
28use snafu::ensure;
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
31
32use crate::error::{PartitionOutOfRangeSnafu, Result};
33use crate::read::range::RangeBuilderList;
34use crate::read::scan_region::{ScanInput, StreamContext};
35use crate::read::scan_util::{
36 scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
37};
38use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
39use crate::read::{scan_util, Batch, ScannerMetrics};
40
41pub struct UnorderedScan {
45 properties: ScannerProperties,
47 stream_ctx: Arc<StreamContext>,
49 metrics_list: PartitionMetricsList,
51}
52
53impl UnorderedScan {
54 pub(crate) fn new(input: ScanInput) -> Self {
56 let mut properties = ScannerProperties::default()
57 .with_append_mode(input.append_mode)
58 .with_total_rows(input.total_rows());
59 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
60 properties.partitions = vec![stream_ctx.partition_ranges()];
61
62 Self {
63 properties,
64 stream_ctx,
65 metrics_list: PartitionMetricsList::default(),
66 }
67 }
68
69 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
71 let metrics_set = ExecutionPlanMetricsSet::new();
72 let part_num = self.properties.num_partitions();
73 let streams = (0..part_num)
74 .map(|i| self.scan_partition(&metrics_set, i))
75 .collect::<Result<Vec<_>, BoxedError>>()?;
76 let stream = stream! {
77 for mut stream in streams {
78 while let Some(rb) = stream.next().await {
79 yield rb;
80 }
81 }
82 };
83 let stream = Box::pin(RecordBatchStreamWrapper::new(
84 self.schema(),
85 Box::pin(stream),
86 ));
87 Ok(stream)
88 }
89
90 fn scan_partition_range(
92 stream_ctx: Arc<StreamContext>,
93 part_range_id: usize,
94 part_metrics: PartitionMetrics,
95 range_builder_list: Arc<RangeBuilderList>,
96 ) -> impl Stream<Item = Result<Batch>> {
97 try_stream! {
98 let range_meta = &stream_ctx.ranges[part_range_id];
100 for index in &range_meta.row_group_indices {
101 if stream_ctx.is_mem_range_index(*index) {
102 let stream = scan_mem_ranges(
103 stream_ctx.clone(),
104 part_metrics.clone(),
105 *index,
106 range_meta.time_range,
107 );
108 for await batch in stream {
109 yield batch?;
110 }
111 } else if stream_ctx.is_file_range_index(*index) {
112 let stream = scan_file_ranges(
113 stream_ctx.clone(),
114 part_metrics.clone(),
115 *index,
116 "unordered_scan_files",
117 range_builder_list.clone(),
118 ).await?;
119 for await batch in stream {
120 yield batch?;
121 }
122 } else {
123 let stream = scan_util::maybe_scan_other_ranges(
124 &stream_ctx,
125 *index,
126 &part_metrics,
127 ).await?;
128 for await batch in stream {
129 yield batch?;
130 }
131 }
132 }
133 }
134 }
135
136 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
138 let metrics_set = ExecutionPlanMetricsSet::new();
139
140 let streams = (0..self.properties.partitions.len())
141 .map(|partition| {
142 let metrics = self.partition_metrics(partition, &metrics_set);
143 self.scan_batch_in_partition(partition, metrics)
144 })
145 .collect::<Result<Vec<_>>>()?;
146
147 Ok(Box::pin(futures::stream::iter(streams).flatten()))
148 }
149
150 fn partition_metrics(
151 &self,
152 partition: usize,
153 metrics_set: &ExecutionPlanMetricsSet,
154 ) -> PartitionMetrics {
155 let part_metrics = PartitionMetrics::new(
156 self.stream_ctx.input.mapper.metadata().region_id,
157 partition,
158 "UnorderedScan",
159 self.stream_ctx.query_start,
160 metrics_set,
161 );
162 self.metrics_list.set(partition, part_metrics.clone());
163 part_metrics
164 }
165
166 fn scan_partition_impl(
167 &self,
168 metrics_set: &ExecutionPlanMetricsSet,
169 partition: usize,
170 ) -> Result<SendableRecordBatchStream> {
171 let metrics = self.partition_metrics(partition, metrics_set);
172
173 let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
174
175 let input = &self.stream_ctx.input;
176 let record_batch_stream = ConvertBatchStream::new(
177 batch_stream,
178 input.mapper.clone(),
179 input.cache_strategy.clone(),
180 metrics,
181 );
182
183 Ok(Box::pin(RecordBatchStreamWrapper::new(
184 input.mapper.output_schema(),
185 Box::pin(record_batch_stream),
186 )))
187 }
188
189 fn scan_batch_in_partition(
190 &self,
191 partition: usize,
192 part_metrics: PartitionMetrics,
193 ) -> Result<ScanBatchStream> {
194 ensure!(
195 partition < self.properties.partitions.len(),
196 PartitionOutOfRangeSnafu {
197 given: partition,
198 all: self.properties.partitions.len(),
199 }
200 );
201
202 let stream_ctx = self.stream_ctx.clone();
203 let part_ranges = self.properties.partitions[partition].clone();
204 let distinguish_range = self.properties.distinguish_partition_range;
205
206 let stream = try_stream! {
207 part_metrics.on_first_poll();
208
209 let range_builder_list = Arc::new(RangeBuilderList::new(
210 stream_ctx.input.num_memtables(),
211 stream_ctx.input.num_files(),
212 ));
213 for part_range in part_ranges {
215 let mut metrics = ScannerMetrics::default();
216 let mut fetch_start = Instant::now();
217 #[cfg(debug_assertions)]
218 let mut checker = crate::read::BatchChecker::default()
219 .with_start(Some(part_range.start))
220 .with_end(Some(part_range.end));
221
222 let stream = Self::scan_partition_range(
223 stream_ctx.clone(),
224 part_range.identifier,
225 part_metrics.clone(),
226 range_builder_list.clone(),
227 );
228 for await batch in stream {
229 let batch = batch?;
230 metrics.scan_cost += fetch_start.elapsed();
231 metrics.num_batches += 1;
232 metrics.num_rows += batch.num_rows();
233
234 debug_assert!(!batch.is_empty());
235 if batch.is_empty() {
236 continue;
237 }
238
239 #[cfg(debug_assertions)]
240 checker.ensure_part_range_batch(
241 "UnorderedScan",
242 stream_ctx.input.mapper.metadata().region_id,
243 partition,
244 part_range,
245 &batch,
246 );
247
248 let yield_start = Instant::now();
249 yield ScanBatch::Normal(batch);
250 metrics.yield_cost += yield_start.elapsed();
251
252 fetch_start = Instant::now();
253 }
254
255 if distinguish_range {
258 let yield_start = Instant::now();
259 yield ScanBatch::Normal(Batch::empty());
260 metrics.yield_cost += yield_start.elapsed();
261 }
262
263 metrics.scan_cost += fetch_start.elapsed();
264 part_metrics.merge_metrics(&metrics);
265 }
266 };
267 Ok(Box::pin(stream))
268 }
269}
270
271impl RegionScanner for UnorderedScan {
272 fn properties(&self) -> &ScannerProperties {
273 &self.properties
274 }
275
276 fn schema(&self) -> SchemaRef {
277 self.stream_ctx.input.mapper.output_schema()
278 }
279
280 fn metadata(&self) -> RegionMetadataRef {
281 self.stream_ctx.input.mapper.metadata().clone()
282 }
283
284 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
285 self.properties.prepare(request);
286 Ok(())
287 }
288
289 fn scan_partition(
290 &self,
291 metrics_set: &ExecutionPlanMetricsSet,
292 partition: usize,
293 ) -> Result<SendableRecordBatchStream, BoxedError> {
294 self.scan_partition_impl(metrics_set, partition)
295 .map_err(BoxedError::new)
296 }
297
298 fn has_predicate(&self) -> bool {
299 let predicate = self.stream_ctx.input.predicate();
300 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
301 }
302
303 fn set_logical_region(&mut self, logical_region: bool) {
304 self.properties.set_logical_region(logical_region);
305 }
306}
307
308impl DisplayAs for UnorderedScan {
309 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
310 write!(
311 f,
312 "UnorderedScan: region={}, ",
313 self.stream_ctx.input.mapper.metadata().region_id
314 )?;
315 match t {
316 DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
317 DisplayFormatType::Verbose => {
318 self.stream_ctx.format_for_explain(true, f)?;
319 self.metrics_list.format_verbose_metrics(f)
320 }
321 }
322 }
323}
324
325impl fmt::Debug for UnorderedScan {
326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327 f.debug_struct("UnorderedScan")
328 .field("num_ranges", &self.stream_ctx.ranges.len())
329 .finish()
330 }
331}
332
333#[cfg(test)]
334impl UnorderedScan {
335 pub(crate) fn input(&self) -> &ScanInput {
337 &self.stream_ctx.input
338 }
339}