1use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33 PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47 PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48 scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49 should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53 Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58pub struct SeqScan {
63 properties: ScannerProperties,
65 stream_ctx: Arc<StreamContext>,
67 metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73 pub(crate) fn new(input: ScanInput) -> Self {
76 let mut properties = ScannerProperties::default()
77 .with_append_mode(input.append_mode)
78 .with_total_rows(input.total_rows());
79 let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80 properties.partitions = vec![stream_ctx.partition_ranges()];
81
82 Self {
83 properties,
84 stream_ctx,
85 metrics_list: PartitionMetricsList::default(),
86 }
87 }
88
89 #[tracing::instrument(
94 skip_all,
95 fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
96 )]
97 pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
98 let metrics_set = ExecutionPlanMetricsSet::new();
99 let streams = (0..self.properties.partitions.len())
100 .map(|partition: usize| {
101 self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
102 })
103 .collect::<Result<Vec<_>, _>>()?;
104
105 let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
106 Ok(Box::pin(aggr_stream))
107 }
108
109 pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
111 let metrics_set = ExecutionPlanMetricsSet::new();
112
113 let streams = (0..self.properties.partitions.len())
114 .map(|partition| {
115 let metrics = self.new_partition_metrics(false, &metrics_set, partition);
116 self.scan_batch_in_partition(partition, metrics)
117 })
118 .collect::<Result<Vec<_>>>()?;
119
120 Ok(Box::pin(futures::stream::iter(streams).flatten()))
121 }
122
123 pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
128 assert!(self.stream_ctx.input.compaction);
129
130 let metrics_set = ExecutionPlanMetricsSet::new();
131 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
132 debug_assert_eq!(1, self.properties.partitions.len());
133 let partition_ranges = &self.properties.partitions[0];
134
135 let reader = Self::merge_all_ranges_for_compaction(
136 &self.stream_ctx,
137 partition_ranges,
138 &part_metrics,
139 )
140 .await?;
141 Ok(Box::new(reader))
142 }
143
144 pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
149 assert!(self.stream_ctx.input.compaction);
150
151 let metrics_set = ExecutionPlanMetricsSet::new();
152 let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
153 debug_assert_eq!(1, self.properties.partitions.len());
154 let partition_ranges = &self.properties.partitions[0];
155
156 let reader = Self::merge_all_flat_ranges_for_compaction(
157 &self.stream_ctx,
158 partition_ranges,
159 &part_metrics,
160 )
161 .await?;
162 Ok(reader)
163 }
164
165 async fn merge_all_ranges_for_compaction(
168 stream_ctx: &Arc<StreamContext>,
169 partition_ranges: &[PartitionRange],
170 part_metrics: &PartitionMetrics,
171 ) -> Result<BoxedBatchReader> {
172 let mut sources = Vec::new();
173 let range_builder_list = Arc::new(RangeBuilderList::new(
174 stream_ctx.input.num_memtables(),
175 stream_ctx.input.num_files(),
176 ));
177 for part_range in partition_ranges {
178 build_sources(
179 stream_ctx,
180 part_range,
181 true,
182 part_metrics,
183 range_builder_list.clone(),
184 &mut sources,
185 None,
186 )
187 .await?;
188 }
189
190 common_telemetry::debug!(
191 "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
192 stream_ctx.input.mapper.metadata().region_id,
193 partition_ranges.len(),
194 sources.len()
195 );
196 Self::build_reader_from_sources(stream_ctx, sources, None, None).await
197 }
198
199 async fn merge_all_flat_ranges_for_compaction(
202 stream_ctx: &Arc<StreamContext>,
203 partition_ranges: &[PartitionRange],
204 part_metrics: &PartitionMetrics,
205 ) -> Result<BoxedRecordBatchStream> {
206 let mut sources = Vec::new();
207 let range_builder_list = Arc::new(RangeBuilderList::new(
208 stream_ctx.input.num_memtables(),
209 stream_ctx.input.num_files(),
210 ));
211 for part_range in partition_ranges {
212 build_flat_sources(
213 stream_ctx,
214 part_range,
215 true,
216 part_metrics,
217 range_builder_list.clone(),
218 &mut sources,
219 None,
220 )
221 .await?;
222 }
223
224 common_telemetry::debug!(
225 "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
226 stream_ctx.input.mapper.metadata().region_id,
227 partition_ranges.len(),
228 sources.len()
229 );
230 Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
231 }
232
233 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
236 pub(crate) async fn build_reader_from_sources(
237 stream_ctx: &StreamContext,
238 mut sources: Vec<Source>,
239 semaphore: Option<Arc<Semaphore>>,
240 part_metrics: Option<&PartitionMetrics>,
241 ) -> Result<BoxedBatchReader> {
242 if let Some(semaphore) = semaphore.as_ref() {
243 if sources.len() > 1 {
245 sources = stream_ctx
246 .input
247 .create_parallel_sources(sources, semaphore.clone())?;
248 }
249 }
250
251 let mut builder = MergeReaderBuilder::from_sources(sources);
252 if let Some(metrics) = part_metrics {
253 builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
254 }
255 let reader = builder.build().await?;
256
257 let dedup = !stream_ctx.input.append_mode;
258 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
259 let reader = if dedup {
260 match stream_ctx.input.merge_mode {
261 MergeMode::LastRow => Box::new(DedupReader::new(
262 reader,
263 LastRow::new(stream_ctx.input.filter_deleted),
264 dedup_metrics_reporter,
265 )) as _,
266 MergeMode::LastNonNull => Box::new(DedupReader::new(
267 reader,
268 LastNonNull::new(stream_ctx.input.filter_deleted),
269 dedup_metrics_reporter,
270 )) as _,
271 }
272 } else {
273 Box::new(reader) as _
274 };
275
276 let reader = match &stream_ctx.input.series_row_selector {
277 Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
278 None => reader,
279 };
280
281 Ok(reader)
282 }
283
284 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
287 pub(crate) async fn build_flat_reader_from_sources(
288 stream_ctx: &StreamContext,
289 mut sources: Vec<BoxedRecordBatchStream>,
290 semaphore: Option<Arc<Semaphore>>,
291 part_metrics: Option<&PartitionMetrics>,
292 ) -> Result<BoxedRecordBatchStream> {
293 if let Some(semaphore) = semaphore.as_ref() {
294 if sources.len() > 1 {
296 sources = stream_ctx
297 .input
298 .create_parallel_flat_sources(sources, semaphore.clone())?;
299 }
300 }
301
302 let mapper = stream_ctx.input.mapper.as_flat().unwrap();
303 let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
304
305 let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
306 let reader =
307 FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
308 .await?;
309
310 let dedup = !stream_ctx.input.append_mode;
311 let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
312 let reader = if dedup {
313 match stream_ctx.input.merge_mode {
314 MergeMode::LastRow => Box::pin(
315 FlatDedupReader::new(
316 reader.into_stream().boxed(),
317 FlatLastRow::new(stream_ctx.input.filter_deleted),
318 dedup_metrics_reporter,
319 )
320 .into_stream(),
321 ) as _,
322 MergeMode::LastNonNull => Box::pin(
323 FlatDedupReader::new(
324 reader.into_stream().boxed(),
325 FlatLastNonNull::new(
326 mapper.field_column_start(),
327 stream_ctx.input.filter_deleted,
328 ),
329 dedup_metrics_reporter,
330 )
331 .into_stream(),
332 ) as _,
333 }
334 } else {
335 Box::pin(reader.into_stream()) as _
336 };
337
338 Ok(reader)
339 }
340
341 fn scan_partition_impl(
344 &self,
345 ctx: &QueryScanContext,
346 metrics_set: &ExecutionPlanMetricsSet,
347 partition: usize,
348 ) -> Result<SendableRecordBatchStream> {
349 if ctx.explain_verbose {
350 common_telemetry::info!(
351 "SeqScan partition {}, region_id: {}",
352 partition,
353 self.stream_ctx.input.region_metadata().region_id
354 );
355 }
356
357 let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
358 let input = &self.stream_ctx.input;
359
360 let batch_stream = if input.flat_format {
361 self.scan_flat_batch_in_partition(partition, metrics.clone())?
363 } else {
364 self.scan_batch_in_partition(partition, metrics.clone())?
366 };
367 let record_batch_stream = ConvertBatchStream::new(
368 batch_stream,
369 input.mapper.clone(),
370 input.cache_strategy.clone(),
371 metrics,
372 );
373
374 Ok(Box::pin(RecordBatchStreamWrapper::new(
375 input.mapper.output_schema(),
376 Box::pin(record_batch_stream),
377 )))
378 }
379
380 #[tracing::instrument(
381 skip_all,
382 fields(
383 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
384 partition = partition
385 )
386 )]
387 fn scan_batch_in_partition(
388 &self,
389 partition: usize,
390 part_metrics: PartitionMetrics,
391 ) -> Result<ScanBatchStream> {
392 ensure!(
393 partition < self.properties.partitions.len(),
394 PartitionOutOfRangeSnafu {
395 given: partition,
396 all: self.properties.partitions.len(),
397 }
398 );
399
400 if self.properties.partitions[partition].is_empty() {
401 return Ok(Box::pin(futures::stream::empty()));
402 }
403
404 let stream_ctx = self.stream_ctx.clone();
405 let semaphore = self.new_semaphore();
406 let partition_ranges = self.properties.partitions[partition].clone();
407 let compaction = self.stream_ctx.input.compaction;
408 let distinguish_range = self.properties.distinguish_partition_range;
409 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
410
411 let stream = try_stream! {
412 part_metrics.on_first_poll();
413 let mut fetch_start = Instant::now();
416
417 let range_builder_list = Arc::new(RangeBuilderList::new(
418 stream_ctx.input.num_memtables(),
419 stream_ctx.input.num_files(),
420 ));
421 let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
422 reason: "Unexpected format",
423 })?;
424 for part_range in partition_ranges {
426 let mut sources = Vec::new();
427 build_sources(
428 &stream_ctx,
429 &part_range,
430 compaction,
431 &part_metrics,
432 range_builder_list.clone(),
433 &mut sources,
434 file_scan_semaphore.clone(),
435 ).await?;
436
437 let mut reader =
438 Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
439 .await?;
440 #[cfg(debug_assertions)]
441 let mut checker = crate::read::BatchChecker::default()
442 .with_start(Some(part_range.start))
443 .with_end(Some(part_range.end));
444
445 let mut metrics = ScannerMetrics {
446 scan_cost: fetch_start.elapsed(),
447 ..Default::default()
448 };
449 fetch_start = Instant::now();
450
451 while let Some(batch) = reader.next_batch().await? {
452 metrics.scan_cost += fetch_start.elapsed();
453 metrics.num_batches += 1;
454 metrics.num_rows += batch.num_rows();
455
456 debug_assert!(!batch.is_empty());
457 if batch.is_empty() {
458 fetch_start = Instant::now();
459 continue;
460 }
461
462 #[cfg(debug_assertions)]
463 checker.ensure_part_range_batch(
464 "SeqScan",
465 _mapper.metadata().region_id,
466 partition,
467 part_range,
468 &batch,
469 );
470
471 let yield_start = Instant::now();
472 yield ScanBatch::Normal(batch);
473 metrics.yield_cost += yield_start.elapsed();
474
475 fetch_start = Instant::now();
476 }
477
478 if distinguish_range {
481 let yield_start = Instant::now();
482 yield ScanBatch::Normal(Batch::empty());
483 metrics.yield_cost += yield_start.elapsed();
484 }
485
486 metrics.scan_cost += fetch_start.elapsed();
487 fetch_start = Instant::now();
488 part_metrics.merge_metrics(&metrics);
489 }
490
491 part_metrics.on_finish();
492 };
493 Ok(Box::pin(stream))
494 }
495
496 #[tracing::instrument(
497 skip_all,
498 fields(
499 region_id = %self.stream_ctx.input.mapper.metadata().region_id,
500 partition = partition
501 )
502 )]
503 fn scan_flat_batch_in_partition(
504 &self,
505 partition: usize,
506 part_metrics: PartitionMetrics,
507 ) -> Result<ScanBatchStream> {
508 ensure!(
509 partition < self.properties.partitions.len(),
510 PartitionOutOfRangeSnafu {
511 given: partition,
512 all: self.properties.partitions.len(),
513 }
514 );
515
516 if self.properties.partitions[partition].is_empty() {
517 return Ok(Box::pin(futures::stream::empty()));
518 }
519
520 let stream_ctx = self.stream_ctx.clone();
521 let semaphore = self.new_semaphore();
522 let partition_ranges = self.properties.partitions[partition].clone();
523 let compaction = self.stream_ctx.input.compaction;
524 let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
525
526 let stream = try_stream! {
527 part_metrics.on_first_poll();
528 let mut fetch_start = Instant::now();
531
532 let range_builder_list = Arc::new(RangeBuilderList::new(
533 stream_ctx.input.num_memtables(),
534 stream_ctx.input.num_files(),
535 ));
536 for part_range in partition_ranges {
538 let mut sources = Vec::new();
539 build_flat_sources(
540 &stream_ctx,
541 &part_range,
542 compaction,
543 &part_metrics,
544 range_builder_list.clone(),
545 &mut sources,
546 file_scan_semaphore.clone(),
547 ).await?;
548
549 let mut reader =
550 Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
551 .await?;
552
553 let mut metrics = ScannerMetrics {
554 scan_cost: fetch_start.elapsed(),
555 ..Default::default()
556 };
557 fetch_start = Instant::now();
558
559 while let Some(record_batch) = reader.try_next().await? {
560 metrics.scan_cost += fetch_start.elapsed();
561 metrics.num_batches += 1;
562 metrics.num_rows += record_batch.num_rows();
563
564 debug_assert!(record_batch.num_rows() > 0);
565 if record_batch.num_rows() == 0 {
566 fetch_start = Instant::now();
567 continue;
568 }
569
570 let yield_start = Instant::now();
571 yield ScanBatch::RecordBatch(record_batch);
572 metrics.yield_cost += yield_start.elapsed();
573
574 fetch_start = Instant::now();
575 }
576
577 metrics.scan_cost += fetch_start.elapsed();
578 fetch_start = Instant::now();
579 part_metrics.merge_metrics(&metrics);
580 }
581
582 part_metrics.on_finish();
583 };
584 Ok(Box::pin(stream))
585 }
586
587 fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
588 if self.properties.target_partitions() > self.properties.num_partitions() {
589 Some(Arc::new(Semaphore::new(
595 self.properties.target_partitions() - self.properties.num_partitions() + 1,
596 )))
597 } else {
598 None
599 }
600 }
601
602 fn new_partition_metrics(
605 &self,
606 explain_verbose: bool,
607 metrics_set: &ExecutionPlanMetricsSet,
608 partition: usize,
609 ) -> PartitionMetrics {
610 let metrics = PartitionMetrics::new(
611 self.stream_ctx.input.mapper.metadata().region_id,
612 partition,
613 get_scanner_type(self.stream_ctx.input.compaction),
614 self.stream_ctx.query_start,
615 explain_verbose,
616 metrics_set,
617 );
618
619 if !self.stream_ctx.input.compaction {
620 self.metrics_list.set(partition, metrics.clone());
621 }
622
623 metrics
624 }
625
626 fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
628 partition_ranges
629 .iter()
630 .map(|part_range| {
631 let range_meta = &ranges[part_range.identifier];
632 range_meta.indices.len()
633 })
634 .max()
635 .unwrap_or(0)
636 }
637
638 pub(crate) fn check_scan_limit(&self) -> Result<()> {
640 let total_max_files: usize = self
642 .properties
643 .partitions
644 .iter()
645 .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
646 .sum();
647
648 let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
649 if total_max_files > max_concurrent_files {
650 return TooManyFilesToReadSnafu {
651 actual: total_max_files,
652 max: max_concurrent_files,
653 }
654 .fail();
655 }
656
657 Ok(())
658 }
659}
660
661impl RegionScanner for SeqScan {
662 fn name(&self) -> &str {
663 "SeqScan"
664 }
665
666 fn properties(&self) -> &ScannerProperties {
667 &self.properties
668 }
669
670 fn schema(&self) -> SchemaRef {
671 self.stream_ctx.input.mapper.output_schema()
672 }
673
674 fn metadata(&self) -> RegionMetadataRef {
675 self.stream_ctx.input.mapper.metadata().clone()
676 }
677
678 fn scan_partition(
679 &self,
680 ctx: &QueryScanContext,
681 metrics_set: &ExecutionPlanMetricsSet,
682 partition: usize,
683 ) -> Result<SendableRecordBatchStream, BoxedError> {
684 self.scan_partition_impl(ctx, metrics_set, partition)
685 .map_err(BoxedError::new)
686 }
687
688 fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
689 self.properties.prepare(request);
690
691 self.check_scan_limit().map_err(BoxedError::new)?;
692
693 Ok(())
694 }
695
696 fn has_predicate_without_region(&self) -> bool {
697 let predicate = self
698 .stream_ctx
699 .input
700 .predicate_group()
701 .predicate_without_region();
702 predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
703 }
704
705 fn set_logical_region(&mut self, logical_region: bool) {
706 self.properties.set_logical_region(logical_region);
707 }
708}
709
710impl DisplayAs for SeqScan {
711 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
712 write!(
713 f,
714 "SeqScan: region={}, ",
715 self.stream_ctx.input.mapper.metadata().region_id
716 )?;
717 match t {
718 DisplayFormatType::Default | DisplayFormatType::TreeRender => {
720 self.stream_ctx.format_for_explain(false, f)
721 }
722 DisplayFormatType::Verbose => {
723 self.stream_ctx.format_for_explain(true, f)?;
724 self.metrics_list.format_verbose_metrics(f)
725 }
726 }
727 }
728}
729
730impl fmt::Debug for SeqScan {
731 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732 f.debug_struct("SeqScan")
733 .field("num_ranges", &self.stream_ctx.ranges.len())
734 .finish()
735 }
736}
737
738pub(crate) async fn build_sources(
740 stream_ctx: &Arc<StreamContext>,
741 part_range: &PartitionRange,
742 compaction: bool,
743 part_metrics: &PartitionMetrics,
744 range_builder_list: Arc<RangeBuilderList>,
745 sources: &mut Vec<Source>,
746 semaphore: Option<Arc<Semaphore>>,
747) -> Result<()> {
748 let range_meta = &stream_ctx.ranges[part_range.identifier];
750 #[cfg(debug_assertions)]
751 if compaction {
752 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
754 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
755 debug_assert_eq!(
757 -1, row_group_idx.row_group_index,
758 "Expect {} range scan all row groups, given: {}",
759 i, row_group_idx.row_group_index,
760 );
761 }
762 }
763
764 let read_type = if compaction {
765 "compaction"
766 } else {
767 "seq_scan_files"
768 };
769 let num_indices = range_meta.row_group_indices.len();
770 if num_indices == 0 {
771 return Ok(());
772 }
773
774 sources.reserve(num_indices);
775 let mut ordered_sources = Vec::with_capacity(num_indices);
776 ordered_sources.resize_with(num_indices, || None);
777 let mut file_scan_tasks = Vec::new();
778
779 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
780 if stream_ctx.is_mem_range_index(*index) {
781 let stream = scan_mem_ranges(
782 stream_ctx.clone(),
783 part_metrics.clone(),
784 *index,
785 range_meta.time_range,
786 );
787 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
788 } else if stream_ctx.is_file_range_index(*index) {
789 if let Some(semaphore_ref) = semaphore.as_ref() {
790 let stream_ctx = stream_ctx.clone();
792 let part_metrics = part_metrics.clone();
793 let range_builder_list = range_builder_list.clone();
794 let semaphore = Arc::clone(semaphore_ref);
795 let row_group_index = *index;
796 file_scan_tasks.push(async move {
797 let _permit = semaphore.acquire().await.unwrap();
798 let stream = scan_file_ranges(
799 stream_ctx,
800 part_metrics,
801 row_group_index,
802 read_type,
803 range_builder_list,
804 )
805 .await?;
806 Ok((position, Source::Stream(Box::pin(stream) as _)))
807 });
808 } else {
809 let stream = scan_file_ranges(
811 stream_ctx.clone(),
812 part_metrics.clone(),
813 *index,
814 read_type,
815 range_builder_list.clone(),
816 )
817 .await?;
818 ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
819 }
820 } else {
821 let stream =
822 scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
823 ordered_sources[position] = Some(Source::Stream(stream));
824 }
825 }
826
827 if !file_scan_tasks.is_empty() {
828 let results = futures::future::try_join_all(file_scan_tasks).await?;
829 for (position, source) in results {
830 ordered_sources[position] = Some(source);
831 }
832 }
833
834 for source in ordered_sources.into_iter().flatten() {
835 sources.push(source);
836 }
837 Ok(())
838}
839
840pub(crate) async fn build_flat_sources(
842 stream_ctx: &Arc<StreamContext>,
843 part_range: &PartitionRange,
844 compaction: bool,
845 part_metrics: &PartitionMetrics,
846 range_builder_list: Arc<RangeBuilderList>,
847 sources: &mut Vec<BoxedRecordBatchStream>,
848 semaphore: Option<Arc<Semaphore>>,
849) -> Result<()> {
850 let range_meta = &stream_ctx.ranges[part_range.identifier];
852 #[cfg(debug_assertions)]
853 if compaction {
854 debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
856 for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
857 debug_assert_eq!(
859 -1, row_group_idx.row_group_index,
860 "Expect {} range scan all row groups, given: {}",
861 i, row_group_idx.row_group_index,
862 );
863 }
864 }
865
866 let read_type = if compaction {
867 "compaction"
868 } else {
869 "seq_scan_files"
870 };
871 let num_indices = range_meta.row_group_indices.len();
872 if num_indices == 0 {
873 return Ok(());
874 }
875
876 let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
877 sources.reserve(num_indices);
878 let mut ordered_sources = Vec::with_capacity(num_indices);
879 ordered_sources.resize_with(num_indices, || None);
880 let mut file_scan_tasks = Vec::new();
881
882 for (position, index) in range_meta.row_group_indices.iter().enumerate() {
883 if stream_ctx.is_mem_range_index(*index) {
884 let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
885 ordered_sources[position] = Some(Box::pin(stream) as _);
886 } else if stream_ctx.is_file_range_index(*index) {
887 if let Some(semaphore_ref) = semaphore.as_ref() {
888 let stream_ctx = stream_ctx.clone();
890 let part_metrics = part_metrics.clone();
891 let range_builder_list = range_builder_list.clone();
892 let semaphore = Arc::clone(semaphore_ref);
893 let row_group_index = *index;
894 file_scan_tasks.push(async move {
895 let _permit = semaphore.acquire().await.unwrap();
896 let stream = scan_flat_file_ranges(
897 stream_ctx,
898 part_metrics,
899 row_group_index,
900 read_type,
901 range_builder_list,
902 )
903 .await?;
904 Ok((position, Box::pin(stream) as _))
905 });
906 } else {
907 let stream = scan_flat_file_ranges(
909 stream_ctx.clone(),
910 part_metrics.clone(),
911 *index,
912 read_type,
913 range_builder_list.clone(),
914 )
915 .await?;
916 ordered_sources[position] = Some(Box::pin(stream) as _);
917 }
918 } else {
919 let stream =
920 scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
921 ordered_sources[position] = Some(stream);
922 }
923 }
924
925 if !file_scan_tasks.is_empty() {
926 let results = futures::future::try_join_all(file_scan_tasks).await?;
927 for (position, stream) in results {
928 ordered_sources[position] = Some(stream);
929 }
930 }
931
932 for stream in ordered_sources.into_iter().flatten() {
933 if should_split {
934 sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
935 } else {
936 sources.push(stream);
937 }
938 }
939
940 if should_split {
941 common_telemetry::debug!(
942 "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
943 stream_ctx.input.region_metadata().region_id,
944 sources.len(),
945 part_range,
946 );
947 }
948
949 Ok(())
950}
951
952#[cfg(test)]
953impl SeqScan {
954 pub(crate) fn input(&self) -> &ScanInput {
956 &self.stream_ctx.input
957 }
958}
959
960fn get_scanner_type(compaction: bool) -> &'static str {
962 if compaction {
963 "SeqScan(compaction)"
964 } else {
965 "SeqScan"
966 }
967}