1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48 scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49 should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58pub struct SeqScan {
63 properties: ScannerProperties,
65 stream_ctx: Arc<StreamContext>,
67 metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 metrics_list: PartitionMetricsList::default(),
86 }
87 }
88
89 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
94 let metrics_set = ExecutionPlanMetricsSet::new();
95 let streams = (0..self.properties.partitions.len())
96 .map(|partition: usize| {
97 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
98 })
99 .collect::<Result<Vec<_>, _>>()?;
100
101 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
102 Ok(Box::pin(aggr_stream))
103 }
104
105 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
107 let metrics_set = ExecutionPlanMetricsSet::new();
108
109 let streams = (0..self.properties.partitions.len())
110 .map(|partition| {
111 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
112 self.scan_batch_in_partition(partition, metrics)
113 })
114 .collect::<Result<Vec<_>>>()?;
115
116 Ok(Box::pin(futures::stream::iter(streams).flatten()))
117 }
118
119 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
124 assert!(self.stream_ctx.input.compaction);
125
126 let metrics_set = ExecutionPlanMetricsSet::new();
127 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
128 debug_assert_eq!(1, self.properties.partitions.len());
129 let partition_ranges = &self.properties.partitions[0];
130
131 let reader = Self::merge_all_ranges_for_compaction(
132 &self.stream_ctx,
133 partition_ranges,
134 &part_metrics,
135 )
136 .await?;
137 Ok(Box::new(reader))
138 }
139
140 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
145 assert!(self.stream_ctx.input.compaction);
146
147 let metrics_set = ExecutionPlanMetricsSet::new();
148 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
149 debug_assert_eq!(1, self.properties.partitions.len());
150 let partition_ranges = &self.properties.partitions[0];
151
152 let reader = Self::merge_all_flat_ranges_for_compaction(
153 &self.stream_ctx,
154 partition_ranges,
155 &part_metrics,
156 )
157 .await?;
158 Ok(reader)
159 }
160
161 async fn merge_all_ranges_for_compaction(
164 stream_ctx: &Arc<StreamContext>,
165 partition_ranges: &[PartitionRange],
166 part_metrics: &PartitionMetrics,
167 ) -> Result<BoxedBatchReader> {
168 let mut sources = Vec::new();
169 let range_builder_list = Arc::new(RangeBuilderList::new(
170 stream_ctx.input.num_memtables(),
171 stream_ctx.input.num_files(),
172 ));
173 for part_range in partition_ranges {
174 build_sources(
175 stream_ctx,
176 part_range,
177 true,
178 part_metrics,
179 range_builder_list.clone(),
180 &mut sources,
181 None,
182 )
183 .await?;
184 }
185
186 common_telemetry::debug!(
187 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
188 stream_ctx.input.mapper.metadata().region_id,
189 partition_ranges.len(),
190 sources.len()
191 );
192 Self::build_reader_from_sources(stream_ctx, sources, None).await
193 }
194
195 async fn merge_all_flat_ranges_for_compaction(
198 stream_ctx: &Arc<StreamContext>,
199 partition_ranges: &[PartitionRange],
200 part_metrics: &PartitionMetrics,
201 ) -> Result<BoxedRecordBatchStream> {
202 let mut sources = Vec::new();
203 let range_builder_list = Arc::new(RangeBuilderList::new(
204 stream_ctx.input.num_memtables(),
205 stream_ctx.input.num_files(),
206 ));
207 for part_range in partition_ranges {
208 build_flat_sources(
209 stream_ctx,
210 part_range,
211 true,
212 part_metrics,
213 range_builder_list.clone(),
214 &mut sources,
215 None,
216 )
217 .await?;
218 }
219
220 common_telemetry::debug!(
221 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
222 stream_ctx.input.mapper.metadata().region_id,
223 partition_ranges.len(),
224 sources.len()
225 );
226 Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
227 }
228
229 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
232 pub(crate) async fn build_reader_from_sources(
233 stream_ctx: &StreamContext,
234 mut sources: Vec<Source>,
235 semaphore: Option<Arc<Semaphore>>,
236 ) -> Result<BoxedBatchReader> {
237 if let Some(semaphore) = semaphore.as_ref() {
238 if sources.len() > 1 {
240 sources = stream_ctx
241 .input
242 .create_parallel_sources(sources, semaphore.clone())?;
243 }
244 }
245
246 let mut builder = MergeReaderBuilder::from_sources(sources);
247 let reader = builder.build().await?;
248
249 let dedup = !stream_ctx.input.append_mode;
250 let reader = if dedup {
251 match stream_ctx.input.merge_mode {
252 MergeMode::LastRow => Box::new(DedupReader::new(
253 reader,
254 LastRow::new(stream_ctx.input.filter_deleted),
255 )) as _,
256 MergeMode::LastNonNull => Box::new(DedupReader::new(
257 reader,
258 LastNonNull::new(stream_ctx.input.filter_deleted),
259 )) as _,
260 }
261 } else {
262 Box::new(reader) as _
263 };
264
265 let reader = match &stream_ctx.input.series_row_selector {
266 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
267 None => reader,
268 };
269
270 Ok(reader)
271 }
272
273 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
276 pub(crate) async fn build_flat_reader_from_sources(
277 stream_ctx: &StreamContext,
278 mut sources: Vec<BoxedRecordBatchStream>,
279 semaphore: Option<Arc<Semaphore>>,
280 ) -> Result<BoxedRecordBatchStream> {
281 if let Some(semaphore) = semaphore.as_ref() {
282 if sources.len() > 1 {
284 sources = stream_ctx
285 .input
286 .create_parallel_flat_sources(sources, semaphore.clone())?;
287 }
288 }
289
290 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
291 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
292
293 let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
294
295 let dedup = !stream_ctx.input.append_mode;
296 let reader = if dedup {
297 match stream_ctx.input.merge_mode {
298 MergeMode::LastRow => Box::pin(
299 FlatDedupReader::new(
300 reader.into_stream().boxed(),
301 FlatLastRow::new(stream_ctx.input.filter_deleted),
302 )
303 .into_stream(),
304 ) as _,
305 MergeMode::LastNonNull => Box::pin(
306 FlatDedupReader::new(
307 reader.into_stream().boxed(),
308 FlatLastNonNull::new(
309 mapper.field_column_start(),
310 stream_ctx.input.filter_deleted,
311 ),
312 )
313 .into_stream(),
314 ) as _,
315 }
316 } else {
317 Box::pin(reader.into_stream()) as _
318 };
319
320 Ok(reader)
321 }
322
323 fn scan_partition_impl(
326 &self,
327 ctx: &QueryScanContext,
328 metrics_set: &ExecutionPlanMetricsSet,
329 partition: usize,
330 ) -> Result<SendableRecordBatchStream> {
331 if ctx.explain_verbose {
332 common_telemetry::info!(
333 "SeqScan partition {}, region_id: {}",
334 partition,
335 self.stream_ctx.input.region_metadata().region_id
336 );
337 }
338
339 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
340 let input = &self.stream_ctx.input;
341
342 let batch_stream = if input.flat_format {
343 self.scan_flat_batch_in_partition(partition, metrics.clone())?
345 } else {
346 self.scan_batch_in_partition(partition, metrics.clone())?
348 };
349 let record_batch_stream = ConvertBatchStream::new(
350 batch_stream,
351 input.mapper.clone(),
352 input.cache_strategy.clone(),
353 metrics,
354 );
355
356 Ok(Box::pin(RecordBatchStreamWrapper::new(
357 input.mapper.output_schema(),
358 Box::pin(record_batch_stream),
359 )))
360 }
361
362 fn scan_batch_in_partition(
363 &self,
364 partition: usize,
365 part_metrics: PartitionMetrics,
366 ) -> Result<ScanBatchStream> {
367 ensure!(
368 partition < self.properties.partitions.len(),
369 PartitionOutOfRangeSnafu {
370 given: partition,
371 all: self.properties.partitions.len(),
372 }
373 );
374
375 if self.properties.partitions[partition].is_empty() {
376 return Ok(Box::pin(futures::stream::empty()));
377 }
378
379 let stream_ctx = self.stream_ctx.clone();
380 let semaphore = self.new_semaphore();
381 let partition_ranges = self.properties.partitions[partition].clone();
382 let compaction = self.stream_ctx.input.compaction;
383 let distinguish_range = self.properties.distinguish_partition_range;
384 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
385
386 let stream = try_stream! {
387 part_metrics.on_first_poll();
388
389 let range_builder_list = Arc::new(RangeBuilderList::new(
390 stream_ctx.input.num_memtables(),
391 stream_ctx.input.num_files(),
392 ));
393 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
394 reason: "Unexpected format",
395 })?;
396 for part_range in partition_ranges {
398 let mut sources = Vec::new();
399 build_sources(
400 &stream_ctx,
401 &part_range,
402 compaction,
403 &part_metrics,
404 range_builder_list.clone(),
405 &mut sources,
406 file_scan_semaphore.clone(),
407 ).await?;
408
409 let mut metrics = ScannerMetrics::default();
410 let mut fetch_start = Instant::now();
411 let mut reader =
412 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
413 .await?;
414 #[cfg(debug_assertions)]
415 let mut checker = crate::read::BatchChecker::default()
416 .with_start(Some(part_range.start))
417 .with_end(Some(part_range.end));
418
419 while let Some(batch) = reader.next_batch().await? {
420 metrics.scan_cost += fetch_start.elapsed();
421 metrics.num_batches += 1;
422 metrics.num_rows += batch.num_rows();
423
424 debug_assert!(!batch.is_empty());
425 if batch.is_empty() {
426 continue;
427 }
428
429 #[cfg(debug_assertions)]
430 checker.ensure_part_range_batch(
431 "SeqScan",
432 _mapper.metadata().region_id,
433 partition,
434 part_range,
435 &batch,
436 );
437
438 let yield_start = Instant::now();
439 yield ScanBatch::Normal(batch);
440 metrics.yield_cost += yield_start.elapsed();
441
442 fetch_start = Instant::now();
443 }
444
445 if distinguish_range {
448 let yield_start = Instant::now();
449 yield ScanBatch::Normal(Batch::empty());
450 metrics.yield_cost += yield_start.elapsed();
451 }
452
453 metrics.scan_cost += fetch_start.elapsed();
454 part_metrics.merge_metrics(&metrics);
455 }
456
457 part_metrics.on_finish();
458 };
459 Ok(Box::pin(stream))
460 }
461
462 fn scan_flat_batch_in_partition(
463 &self,
464 partition: usize,
465 part_metrics: PartitionMetrics,
466 ) -> Result<ScanBatchStream> {
467 ensure!(
468 partition < self.properties.partitions.len(),
469 PartitionOutOfRangeSnafu {
470 given: partition,
471 all: self.properties.partitions.len(),
472 }
473 );
474
475 if self.properties.partitions[partition].is_empty() {
476 return Ok(Box::pin(futures::stream::empty()));
477 }
478
479 let stream_ctx = self.stream_ctx.clone();
480 let semaphore = self.new_semaphore();
481 let partition_ranges = self.properties.partitions[partition].clone();
482 let compaction = self.stream_ctx.input.compaction;
483 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
484
485 let stream = try_stream! {
486 part_metrics.on_first_poll();
487
488 let range_builder_list = Arc::new(RangeBuilderList::new(
489 stream_ctx.input.num_memtables(),
490 stream_ctx.input.num_files(),
491 ));
492 for part_range in partition_ranges {
494 let mut sources = Vec::new();
495 build_flat_sources(
496 &stream_ctx,
497 &part_range,
498 compaction,
499 &part_metrics,
500 range_builder_list.clone(),
501 &mut sources,
502 file_scan_semaphore.clone(),
503 ).await?;
504
505 let mut metrics = ScannerMetrics::default();
506 let mut fetch_start = Instant::now();
507 let mut reader =
508 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
509 .await?;
510
511 while let Some(record_batch) = reader.try_next().await? {
512 metrics.scan_cost += fetch_start.elapsed();
513 metrics.num_batches += 1;
514 metrics.num_rows += record_batch.num_rows();
515
516 debug_assert!(record_batch.num_rows() > 0);
517 if record_batch.num_rows() == 0 {
518 continue;
519 }
520
521 let yield_start = Instant::now();
522 yield ScanBatch::RecordBatch(record_batch);
523 metrics.yield_cost += yield_start.elapsed();
524
525 fetch_start = Instant::now();
526 }
527
528 metrics.scan_cost += fetch_start.elapsed();
529 part_metrics.merge_metrics(&metrics);
530 }
531
532 part_metrics.on_finish();
533 };
534 Ok(Box::pin(stream))
535 }
536
537 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
538 if self.properties.target_partitions() > self.properties.num_partitions() {
539 Some(Arc::new(Semaphore::new(
545 self.properties.target_partitions() - self.properties.num_partitions() + 1,
546 )))
547 } else {
548 None
549 }
550 }
551
552 fn new_partition_metrics(
555 &self,
556 explain_verbose: bool,
557 metrics_set: &ExecutionPlanMetricsSet,
558 partition: usize,
559 ) -> PartitionMetrics {
560 let metrics = PartitionMetrics::new(
561 self.stream_ctx.input.mapper.metadata().region_id,
562 partition,
563 get_scanner_type(self.stream_ctx.input.compaction),
564 self.stream_ctx.query_start,
565 explain_verbose,
566 metrics_set,
567 );
568
569 if !self.stream_ctx.input.compaction {
570 self.metrics_list.set(partition, metrics.clone());
571 }
572
573 metrics
574 }
575
576 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
578 partition_ranges
579 .iter()
580 .map(|part_range| {
581 let range_meta = &ranges[part_range.identifier];
582 range_meta.indices.len()
583 })
584 .max()
585 .unwrap_or(0)
586 }
587
588 pub(crate) fn check_scan_limit(&self) -> Result<()> {
590 let total_max_files: usize = self
592 .properties
593 .partitions
594 .iter()
595 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
596 .sum();
597
598 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
599 if total_max_files > max_concurrent_files {
600 return TooManyFilesToReadSnafu {
601 actual: total_max_files,
602 max: max_concurrent_files,
603 }
604 .fail();
605 }
606
607 Ok(())
608 }
609}
610
611impl RegionScanner for SeqScan {
612 fn name(&self) -> &str {
613 "SeqScan"
614 }
615
616 fn properties(&self) -> &ScannerProperties {
617 &self.properties
618 }
619
620 fn schema(&self) -> SchemaRef {
621 self.stream_ctx.input.mapper.output_schema()
622 }
623
624 fn metadata(&self) -> RegionMetadataRef {
625 self.stream_ctx.input.mapper.metadata().clone()
626 }
627
628 fn scan_partition(
629 &self,
630 ctx: &QueryScanContext,
631 metrics_set: &ExecutionPlanMetricsSet,
632 partition: usize,
633 ) -> Result<SendableRecordBatchStream, BoxedError> {
634 self.scan_partition_impl(ctx, metrics_set, partition)
635 .map_err(BoxedError::new)
636 }
637
638 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
639 self.properties.prepare(request);
640
641 self.check_scan_limit().map_err(BoxedError::new)?;
642
643 Ok(())
644 }
645
646 fn has_predicate_without_region(&self) -> bool {
647 let predicate = self
648 .stream_ctx
649 .input
650 .predicate_group()
651 .predicate_without_region();
652 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
653 }
654
655 fn set_logical_region(&mut self, logical_region: bool) {
656 self.properties.set_logical_region(logical_region);
657 }
658}
659
660impl DisplayAs for SeqScan {
661 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
662 write!(
663 f,
664 "SeqScan: region={}, ",
665 self.stream_ctx.input.mapper.metadata().region_id
666 )?;
667 match t {
668 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
670 self.stream_ctx.format_for_explain(false, f)
671 }
672 DisplayFormatType::Verbose => {
673 self.stream_ctx.format_for_explain(true, f)?;
674 self.metrics_list.format_verbose_metrics(f)
675 }
676 }
677 }
678}
679
680impl fmt::Debug for SeqScan {
681 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682 f.debug_struct("SeqScan")
683 .field("num_ranges", &self.stream_ctx.ranges.len())
684 .finish()
685 }
686}
687
688pub(crate) async fn build_sources(
690 stream_ctx: &Arc<StreamContext>,
691 part_range: &PartitionRange,
692 compaction: bool,
693 part_metrics: &PartitionMetrics,
694 range_builder_list: Arc<RangeBuilderList>,
695 sources: &mut Vec<Source>,
696 semaphore: Option<Arc<Semaphore>>,
697) -> Result<()> {
698 let range_meta = &stream_ctx.ranges[part_range.identifier];
700 #[cfg(debug_assertions)]
701 if compaction {
702 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
704 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
705 debug_assert_eq!(
707 -1, row_group_idx.row_group_index,
708 "Expect {} range scan all row groups, given: {}",
709 i, row_group_idx.row_group_index,
710 );
711 }
712 }
713
714 let read_type = if compaction {
715 "compaction"
716 } else {
717 "seq_scan_files"
718 };
719 let num_indices = range_meta.row_group_indices.len();
720 if num_indices == 0 {
721 return Ok(());
722 }
723
724 sources.reserve(num_indices);
725 let mut ordered_sources = Vec::with_capacity(num_indices);
726 ordered_sources.resize_with(num_indices, || None);
727 let mut file_scan_tasks = Vec::new();
728
729 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
730 if stream_ctx.is_mem_range_index(*index) {
731 let stream = scan_mem_ranges(
732 stream_ctx.clone(),
733 part_metrics.clone(),
734 *index,
735 range_meta.time_range,
736 );
737 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
738 } else if stream_ctx.is_file_range_index(*index) {
739 if let Some(semaphore_ref) = semaphore.as_ref() {
740 let stream_ctx = stream_ctx.clone();
742 let part_metrics = part_metrics.clone();
743 let range_builder_list = range_builder_list.clone();
744 let semaphore = Arc::clone(semaphore_ref);
745 let row_group_index = *index;
746 file_scan_tasks.push(async move {
747 let _permit = semaphore.acquire().await.unwrap();
748 let stream = scan_file_ranges(
749 stream_ctx,
750 part_metrics,
751 row_group_index,
752 read_type,
753 range_builder_list,
754 )
755 .await?;
756 Ok((position, Source::Stream(Box::pin(stream) as _)))
757 });
758 } else {
759 let stream = scan_file_ranges(
761 stream_ctx.clone(),
762 part_metrics.clone(),
763 *index,
764 read_type,
765 range_builder_list.clone(),
766 )
767 .await?;
768 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
769 }
770 } else {
771 let stream =
772 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
773 ordered_sources[position] = Some(Source::Stream(stream));
774 }
775 }
776
777 if !file_scan_tasks.is_empty() {
778 let results = futures::future::try_join_all(file_scan_tasks).await?;
779 for (position, source) in results {
780 ordered_sources[position] = Some(source);
781 }
782 }
783
784 for source in ordered_sources.into_iter().flatten() {
785 sources.push(source);
786 }
787 Ok(())
788}
789
790pub(crate) async fn build_flat_sources(
792 stream_ctx: &Arc<StreamContext>,
793 part_range: &PartitionRange,
794 compaction: bool,
795 part_metrics: &PartitionMetrics,
796 range_builder_list: Arc<RangeBuilderList>,
797 sources: &mut Vec<BoxedRecordBatchStream>,
798 semaphore: Option<Arc<Semaphore>>,
799) -> Result<()> {
800 let range_meta = &stream_ctx.ranges[part_range.identifier];
802 #[cfg(debug_assertions)]
803 if compaction {
804 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
806 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
807 debug_assert_eq!(
809 -1, row_group_idx.row_group_index,
810 "Expect {} range scan all row groups, given: {}",
811 i, row_group_idx.row_group_index,
812 );
813 }
814 }
815
816 let read_type = if compaction {
817 "compaction"
818 } else {
819 "seq_scan_files"
820 };
821 let num_indices = range_meta.row_group_indices.len();
822 if num_indices == 0 {
823 return Ok(());
824 }
825
826 let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
827 sources.reserve(num_indices);
828 let mut ordered_sources = Vec::with_capacity(num_indices);
829 ordered_sources.resize_with(num_indices, || None);
830 let mut file_scan_tasks = Vec::new();
831
832 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
833 if stream_ctx.is_mem_range_index(*index) {
834 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
835 ordered_sources[position] = Some(Box::pin(stream) as _);
836 } else if stream_ctx.is_file_range_index(*index) {
837 if let Some(semaphore_ref) = semaphore.as_ref() {
838 let stream_ctx = stream_ctx.clone();
840 let part_metrics = part_metrics.clone();
841 let range_builder_list = range_builder_list.clone();
842 let semaphore = Arc::clone(semaphore_ref);
843 let row_group_index = *index;
844 file_scan_tasks.push(async move {
845 let _permit = semaphore.acquire().await.unwrap();
846 let stream = scan_flat_file_ranges(
847 stream_ctx,
848 part_metrics,
849 row_group_index,
850 read_type,
851 range_builder_list,
852 )
853 .await?;
854 Ok((position, Box::pin(stream) as _))
855 });
856 } else {
857 let stream = scan_flat_file_ranges(
859 stream_ctx.clone(),
860 part_metrics.clone(),
861 *index,
862 read_type,
863 range_builder_list.clone(),
864 )
865 .await?;
866 ordered_sources[position] = Some(Box::pin(stream) as _);
867 }
868 } else {
869 let stream =
870 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
871 ordered_sources[position] = Some(stream);
872 }
873 }
874
875 if !file_scan_tasks.is_empty() {
876 let results = futures::future::try_join_all(file_scan_tasks).await?;
877 for (position, stream) in results {
878 ordered_sources[position] = Some(stream);
879 }
880 }
881
882 for stream in ordered_sources.into_iter().flatten() {
883 if should_split {
884 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
885 } else {
886 sources.push(stream);
887 }
888 }
889
890 if should_split {
891 common_telemetry::debug!(
892 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
893 stream_ctx.input.region_metadata().region_id,
894 sources.len(),
895 part_range,
896 );
897 }
898
899 Ok(())
900}
901
902#[cfg(test)]
903impl SeqScan {
904 pub(crate) fn input(&self) -> &ScanInput {
906 &self.stream_ctx.input
907 }
908}
909
910fn get_scanner_type(compaction: bool) -> &'static str {
912 if compaction {
913 "SeqScan(compaction)"
914 } else {
915 "SeqScan"
916 }
917}