1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::schema::SchemaRef;
28use futures::{Stream, StreamExt};
29use snafu::ensure;
30use store_api::metadata::RegionMetadataRef;
31use store_api::region_engine::{
32 PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
33};
34
35use crate::error::{PartitionOutOfRangeSnafu, Result};
36use crate::read::range::RangeBuilderList;
37use crate::read::scan_region::{ScanInput, StreamContext};
38use crate::read::scan_util::{
39 scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
40 PartitionMetrics, PartitionMetricsList,
41};
42use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
43use crate::read::{scan_util, Batch, ScannerMetrics};
44
45pub struct UnorderedScan {
49 properties: ScannerProperties,
51 stream_ctx: Arc<StreamContext>,
53 metrics_list: PartitionMetricsList,
55}
56
57impl UnorderedScan {
58 pub(crate) fn new(input: ScanInput) -> Self {
60 let mut properties = ScannerProperties::default()
61 .with_append_mode(input.append_mode)
62 .with_total_rows(input.total_rows());
63 let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
64 properties.partitions = vec![stream_ctx.partition_ranges()];
65
66 Self {
67 properties,
68 stream_ctx,
69 metrics_list: PartitionMetricsList::default(),
70 }
71 }
72
73 pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
75 let metrics_set = ExecutionPlanMetricsSet::new();
76 let part_num = self.properties.num_partitions();
77 let streams = (0..part_num)
78 .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
79 .collect::<Result<Vec<_>, BoxedError>>()?;
80 let stream = stream! {
81 for mut stream in streams {
82 while let Some(rb) = stream.next().await {
83 yield rb;
84 }
85 }
86 };
87 let stream = Box::pin(RecordBatchStreamWrapper::new(
88 self.schema(),
89 Box::pin(stream),
90 ));
91 Ok(stream)
92 }
93
94 fn scan_partition_range(
96 stream_ctx: Arc<StreamContext>,
97 part_range_id: usize,
98 part_metrics: PartitionMetrics,
99 range_builder_list: Arc<RangeBuilderList>,
100 ) -> impl Stream<Item = Result<Batch>> {
101 try_stream! {
102 let range_meta = &stream_ctx.ranges[part_range_id];
104 for index in &range_meta.row_group_indices {
105 if stream_ctx.is_mem_range_index(*index) {
106 let stream = scan_mem_ranges(
107 stream_ctx.clone(),
108 part_metrics.clone(),
109 *index,
110 range_meta.time_range,
111 );
112 for await batch in stream {
113 yield batch?;
114 }
115 } else if stream_ctx.is_file_range_index(*index) {
116 let stream = scan_file_ranges(
117 stream_ctx.clone(),
118 part_metrics.clone(),
119 *index,
120 "unordered_scan_files",
121 range_builder_list.clone(),
122 ).await?;
123 for await batch in stream {
124 yield batch?;
125 }
126 } else {
127 let stream = scan_util::maybe_scan_other_ranges(
128 &stream_ctx,
129 *index,
130 &part_metrics,
131 ).await?;
132 for await batch in stream {
133 yield batch?;
134 }
135 }
136 }
137 }
138 }
139
140 fn scan_flat_partition_range(
142 stream_ctx: Arc<StreamContext>,
143 part_range_id: usize,
144 part_metrics: PartitionMetrics,
145 range_builder_list: Arc<RangeBuilderList>,
146 ) -> impl Stream<Item = Result<RecordBatch>> {
147 try_stream! {
148 let range_meta = &stream_ctx.ranges[part_range_id];
150 for index in &range_meta.row_group_indices {
151 if stream_ctx.is_mem_range_index(*index) {
152 let stream = scan_flat_mem_ranges(
153 stream_ctx.clone(),
154 part_metrics.clone(),
155 *index,
156 );
157 for await record_batch in stream {
158 yield record_batch?;
159 }
160 } else if stream_ctx.is_file_range_index(*index) {
161 let stream = scan_flat_file_ranges(
162 stream_ctx.clone(),
163 part_metrics.clone(),
164 *index,
165 "unordered_scan_files",
166 range_builder_list.clone(),
167 ).await?;
168 for await record_batch in stream {
169 yield record_batch?;
170 }
171 } else {
172 let stream = scan_util::maybe_scan_flat_other_ranges(
173 &stream_ctx,
174 *index,
175 &part_metrics,
176 ).await?;
177 for await record_batch in stream {
178 yield record_batch?;
179 }
180 }
181 }
182 }
183 }
184
185 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
187 let metrics_set = ExecutionPlanMetricsSet::new();
188
189 let streams = (0..self.properties.partitions.len())
190 .map(|partition| {
191 let metrics = self.partition_metrics(false, partition, &metrics_set);
192 self.scan_batch_in_partition(partition, metrics)
193 })
194 .collect::<Result<Vec<_>>>()?;
195
196 Ok(Box::pin(futures::stream::iter(streams).flatten()))
197 }
198
199 fn partition_metrics(
200 &self,
201 explain_verbose: bool,
202 partition: usize,
203 metrics_set: &ExecutionPlanMetricsSet,
204 ) -> PartitionMetrics {
205 let part_metrics = PartitionMetrics::new(
206 self.stream_ctx.input.mapper.metadata().region_id,
207 partition,
208 "UnorderedScan",
209 self.stream_ctx.query_start,
210 explain_verbose,
211 metrics_set,
212 );
213 self.metrics_list.set(partition, part_metrics.clone());
214 part_metrics
215 }
216
217 fn scan_partition_impl(
218 &self,
219 ctx: &QueryScanContext,
220 metrics_set: &ExecutionPlanMetricsSet,
221 partition: usize,
222 ) -> Result<SendableRecordBatchStream> {
223 if ctx.explain_verbose {
224 common_telemetry::info!(
225 "UnorderedScan partition {}, region_id: {}",
226 partition,
227 self.stream_ctx.input.region_metadata().region_id
228 );
229 }
230
231 let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
232 let input = &self.stream_ctx.input;
233
234 let batch_stream = if input.flat_format {
235 self.scan_flat_batch_in_partition(partition, metrics.clone())?
237 } else {
238 self.scan_batch_in_partition(partition, metrics.clone())?
240 };
241
242 let record_batch_stream = ConvertBatchStream::new(
243 batch_stream,
244 input.mapper.clone(),
245 input.cache_strategy.clone(),
246 metrics,
247 );
248
249 Ok(Box::pin(RecordBatchStreamWrapper::new(
250 input.mapper.output_schema(),
251 Box::pin(record_batch_stream),
252 )))
253 }
254
255 fn scan_batch_in_partition(
256 &self,
257 partition: usize,
258 part_metrics: PartitionMetrics,
259 ) -> Result<ScanBatchStream> {
260 ensure!(
261 partition < self.properties.partitions.len(),
262 PartitionOutOfRangeSnafu {
263 given: partition,
264 all: self.properties.partitions.len(),
265 }
266 );
267
268 let stream_ctx = self.stream_ctx.clone();
269 let part_ranges = self.properties.partitions[partition].clone();
270 let distinguish_range = self.properties.distinguish_partition_range;
271
272 let stream = try_stream! {
273 part_metrics.on_first_poll();
274
275 let range_builder_list = Arc::new(RangeBuilderList::new(
276 stream_ctx.input.num_memtables(),
277 stream_ctx.input.num_files(),
278 ));
279 for part_range in part_ranges {
281 let mut metrics = ScannerMetrics::default();
282 let mut fetch_start = Instant::now();
283 let _mapper = &stream_ctx.input.mapper;
284 #[cfg(debug_assertions)]
285 let mut checker = crate::read::BatchChecker::default()
286 .with_start(Some(part_range.start))
287 .with_end(Some(part_range.end));
288
289 let stream = Self::scan_partition_range(
290 stream_ctx.clone(),
291 part_range.identifier,
292 part_metrics.clone(),
293 range_builder_list.clone(),
294 );
295 for await batch in stream {
296 let batch = batch?;
297 metrics.scan_cost += fetch_start.elapsed();
298 metrics.num_batches += 1;
299 metrics.num_rows += batch.num_rows();
300
301 debug_assert!(!batch.is_empty());
302 if batch.is_empty() {
303 continue;
304 }
305
306 #[cfg(debug_assertions)]
307 checker.ensure_part_range_batch(
308 "UnorderedScan",
309 _mapper.metadata().region_id,
310 partition,
311 part_range,
312 &batch,
313 );
314
315 let yield_start = Instant::now();
316 yield ScanBatch::Normal(batch);
317 metrics.yield_cost += yield_start.elapsed();
318
319 fetch_start = Instant::now();
320 }
321
322 if distinguish_range {
325 let yield_start = Instant::now();
326 yield ScanBatch::Normal(Batch::empty());
327 metrics.yield_cost += yield_start.elapsed();
328 }
329
330 metrics.scan_cost += fetch_start.elapsed();
331 part_metrics.merge_metrics(&metrics);
332 }
333
334 part_metrics.on_finish();
335 };
336 Ok(Box::pin(stream))
337 }
338
339 fn scan_flat_batch_in_partition(
340 &self,
341 partition: usize,
342 part_metrics: PartitionMetrics,
343 ) -> Result<ScanBatchStream> {
344 ensure!(
345 partition < self.properties.partitions.len(),
346 PartitionOutOfRangeSnafu {
347 given: partition,
348 all: self.properties.partitions.len(),
349 }
350 );
351
352 let stream_ctx = self.stream_ctx.clone();
353 let part_ranges = self.properties.partitions[partition].clone();
354
355 let stream = try_stream! {
356 part_metrics.on_first_poll();
357
358 let range_builder_list = Arc::new(RangeBuilderList::new(
359 stream_ctx.input.num_memtables(),
360 stream_ctx.input.num_files(),
361 ));
362 for part_range in part_ranges {
364 let mut metrics = ScannerMetrics::default();
365 let mut fetch_start = Instant::now();
366
367 let stream = Self::scan_flat_partition_range(
368 stream_ctx.clone(),
369 part_range.identifier,
370 part_metrics.clone(),
371 range_builder_list.clone(),
372 );
373 for await record_batch in stream {
374 let record_batch = record_batch?;
375 metrics.scan_cost += fetch_start.elapsed();
376 metrics.num_batches += 1;
377 metrics.num_rows += record_batch.num_rows();
378
379 debug_assert!(record_batch.num_rows() > 0);
380 if record_batch.num_rows() == 0 {
381 continue;
382 }
383
384 let yield_start = Instant::now();
385 yield ScanBatch::RecordBatch(record_batch);
386 metrics.yield_cost += yield_start.elapsed();
387
388 fetch_start = Instant::now();
389 }
390
391 metrics.scan_cost += fetch_start.elapsed();
392 part_metrics.merge_metrics(&metrics);
393 }
394
395 part_metrics.on_finish();
396 };
397 Ok(Box::pin(stream))
398 }
399}
400
401impl RegionScanner for UnorderedScan {
402 fn properties(&self) -> &ScannerProperties {
403 &self.properties
404 }
405
406 fn schema(&self) -> SchemaRef {
407 self.stream_ctx.input.mapper.output_schema()
408 }
409
410 fn metadata(&self) -> RegionMetadataRef {
411 self.stream_ctx.input.mapper.metadata().clone()
412 }
413
414 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
415 self.properties.prepare(request);
416 Ok(())
418 }
419
420 fn scan_partition(
421 &self,
422 ctx: &QueryScanContext,
423 metrics_set: &ExecutionPlanMetricsSet,
424 partition: usize,
425 ) -> Result<SendableRecordBatchStream, BoxedError> {
426 self.scan_partition_impl(ctx, metrics_set, partition)
427 .map_err(BoxedError::new)
428 }
429
430 fn has_predicate(&self) -> bool {
431 let predicate = self.stream_ctx.input.predicate();
432 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
433 }
434
435 fn set_logical_region(&mut self, logical_region: bool) {
436 self.properties.set_logical_region(logical_region);
437 }
438}
439
440impl DisplayAs for UnorderedScan {
441 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
442 write!(
443 f,
444 "UnorderedScan: region={}, ",
445 self.stream_ctx.input.mapper.metadata().region_id
446 )?;
447 match t {
448 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
449 self.stream_ctx.format_for_explain(false, f)
450 }
451 DisplayFormatType::Verbose => {
452 self.stream_ctx.format_for_explain(true, f)?;
453 self.metrics_list.format_verbose_metrics(f)
454 }
455 }
456 }
457}
458
459impl fmt::Debug for UnorderedScan {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 f.debug_struct("UnorderedScan")
462 .field("num_ranges", &self.stream_ctx.ranges.len())
463 .finish()
464 }
465}
466
467#[cfg(test)]
468impl UnorderedScan {
469 pub(crate) fn input(&self) -> &ScanInput {
471 &self.stream_ctx.input
472 }
473}